blob: b49eb2771db5525e8145753c7fac427ef0cca43e [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#undef _FORTIFY_SOURCE
// If _FORTIFY_SOURCE is defined, longjmp will complain when it detects the stack
// pointer moving in the "wrong direction", thinking you're jumping to a non-existent
// stack frame. But we use longjmp to jump between different stacks to implement fibers,
// so this check isn't appropriate for us.
#if _WIN32 || __CYGWIN__
#include "win32-api-version.h"
#elif __APPLE__
// getcontext() and friends are marked deprecated on MacOS but seemingly no replacement is
// provided. It appears as if they deprecated it solely because the standards bodies deprecated it,
// which they seemingly did mainly because the proper semantics are too difficult for them to
// define. I doubt MacOS would actually remove these functions as they are widely used. But if they
// do, then I guess we'll need to fall back to using setjmp()/longjmp(), and some sort of hack
// involving sigaltstack() (and generating a fake signal I guess) in order to initialize the fiber
// in the first place. Or we could use assembly, I suppose. Either way, ick.
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#define _XOPEN_SOURCE // Must be defined to see getcontext() on MacOS.
#endif
#include "async.h"
#include "debug.h"
#include "vector.h"
#include "threadlocal.h"
#include "mutex.h"
#include "one-of.h"
#include "function.h"
#include "list.h"
#include <deque>
#include <atomic>
#if _WIN32 || __CYGWIN__
#include <windows.h> // for Sleep(0) and fibers
#include "windows-sanity.h"
#else
#if KJ_USE_FIBERS
#include <ucontext.h>
#include <setjmp.h> // for fibers
#endif
#include <sys/mman.h> // mmap(), for allocating new stacks
#include <unistd.h> // sysconf()
#include <errno.h>
#endif
#if !_WIN32
#include <sched.h> // just for sched_yield()
#endif
#if !KJ_NO_RTTI
#include <typeinfo>
#if __GNUC__
#include <cxxabi.h>
#endif
#endif
#include <stdlib.h>
#if KJ_HAS_COMPILER_FEATURE(address_sanitizer)
// Clang's address sanitizer requires special hints when switching fibers, especially in order for
// stack-use-after-return handling to work right.
//
// TODO(someday): Does GCC's sanitizer, flagged by __SANITIZE_ADDRESS__, have these hints too? I
// don't know and am not in a position to test, so I'm assuming not for now.
#include <sanitizer/common_interface_defs.h>
#else
// Nop the hints so that we don't have to put #ifdefs around every use.
#define __sanitizer_start_switch_fiber(...)
#define __sanitizer_finish_switch_fiber(...)
#endif
#if _MSC_VER && !__clang__
// MSVC's atomic intrinsics are weird and different, whereas the C++ standard atomics match the GCC
// builtins -- except for requiring the obnoxious std::atomic<T> wrapper. So, on MSVC let's just
// #define the builtins based on the C++ library, reinterpret-casting native types to
// std::atomic... this is cheating but ugh, whatever.
template <typename T>
static std::atomic<T>* reinterpretAtomic(T* ptr) { return reinterpret_cast<std::atomic<T>*>(ptr); }
#define __atomic_store_n(ptr, val, order) \
std::atomic_store_explicit(reinterpretAtomic(ptr), val, order)
#define __atomic_load_n(ptr, order) \
std::atomic_load_explicit(reinterpretAtomic(ptr), order)
#define __atomic_compare_exchange_n(ptr, expected, desired, weak, succ, fail) \
std::atomic_compare_exchange_strong_explicit( \
reinterpretAtomic(ptr), expected, desired, succ, fail)
#define __atomic_exchange_n(ptr, val, order) \
std::atomic_exchange_explicit(reinterpretAtomic(ptr), val, order)
#define __ATOMIC_RELAXED std::memory_order_relaxed
#define __ATOMIC_ACQUIRE std::memory_order_acquire
#define __ATOMIC_RELEASE std::memory_order_release
#endif
namespace kj {
namespace {
KJ_THREADLOCAL_PTR(EventLoop) threadLocalEventLoop = nullptr;
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1)
EventLoop& currentEventLoop() {
EventLoop* loop = threadLocalEventLoop;
KJ_REQUIRE(loop != nullptr, "No event loop is running on this thread.");
return *loop;
}
class RootEvent: public _::Event {
public:
RootEvent(_::PromiseNode* node, void* traceAddr, SourceLocation location)
: Event(location), node(node), traceAddr(traceAddr) {}
bool fired = false;
Maybe<Own<_::Event>> fire() override {
fired = true;
return nullptr;
}
void traceEvent(_::TraceBuilder& builder) override {
node->tracePromise(builder, true);
builder.add(traceAddr);
}
private:
_::PromiseNode* node;
void* traceAddr;
};
struct DummyFunctor {
void operator()() {};
};
class YieldPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
if (event) event->armBreadthFirst();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
builder.add(reinterpret_cast<void*>(&kj::evalLater<DummyFunctor>));
}
};
class YieldHarderPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
if (event) event->armLast();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
builder.add(reinterpret_cast<void*>(&kj::evalLast<DummyFunctor>));
}
};
class NeverDonePromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
// ignore
}
void get(_::ExceptionOrValue& output) noexcept override {
KJ_FAIL_REQUIRE("Not ready.");
}
void tracePromise(_::TraceBuilder& builder, bool stopAtNextEvent) override {
builder.add(_::getMethodStartAddress(kj::NEVER_DONE, &_::NeverDone::wait));
}
};
} // namespace
// =======================================================================================
void END_CANCELER_STACK_START_CANCELEE_STACK() {}
// Dummy symbol used when reporting how a Canceler was canceled. We end up combining two stack
// traces into one and we use this as a separator.
Canceler::~Canceler() noexcept(false) {
if (isEmpty()) return;
cancel(getDestructionReason(
reinterpret_cast<void*>(&END_CANCELER_STACK_START_CANCELEE_STACK),
Exception::Type::DISCONNECTED, __FILE__, __LINE__, "operation canceled"_kj));
}
void Canceler::cancel(StringPtr cancelReason) {
if (isEmpty()) return;
// We can't use getDestructionReason() here because if an exception is in-flight, it would use
// that exception, totally discarding the reason given by the caller. This would probably be
// unexpected. The caller can always use getDestructionReason() themselves if desired.
cancel(Exception(Exception::Type::DISCONNECTED, __FILE__, __LINE__, kj::str(cancelReason)));
}
void Canceler::cancel(const Exception& exception) {
for (;;) {
KJ_IF_MAYBE(a, list) {
a->unlink();
a->cancel(kj::cp(exception));
} else {
break;
}
}
}
void Canceler::release() {
for (;;) {
KJ_IF_MAYBE(a, list) {
a->unlink();
} else {
break;
}
}
}
Canceler::AdapterBase::AdapterBase(Canceler& canceler)
: prev(canceler.list),
next(canceler.list) {
canceler.list = *this;
KJ_IF_MAYBE(n, next) {
n->prev = next;
}
}
Canceler::AdapterBase::~AdapterBase() noexcept(false) {
unlink();
}
void Canceler::AdapterBase::unlink() {
KJ_IF_MAYBE(p, prev) {
*p = next;
}
KJ_IF_MAYBE(n, next) {
n->prev = prev;
}
next = nullptr;
prev = nullptr;
}
Canceler::AdapterImpl<void>::AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
Canceler& canceler, kj::Promise<void> inner)
: AdapterBase(canceler),
fulfiller(fulfiller),
inner(inner.then(
[&fulfiller]() { fulfiller.fulfill(); },
[&fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); })
.eagerlyEvaluate(nullptr)) {}
void Canceler::AdapterImpl<void>::cancel(kj::Exception&& e) {
fulfiller.reject(kj::mv(e));
inner = nullptr;
}
// =======================================================================================
TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler, SourceLocation location)
: errorHandler(errorHandler), location(location) {}
class TaskSet::Task final: public _::Event {
public:
Task(TaskSet& taskSet, Own<_::PromiseNode>&& nodeParam)
: Event(taskSet.location), taskSet(taskSet), node(kj::mv(nodeParam)) {
node->setSelfPointer(&node);
node->onReady(this);
}
Own<Task> pop() {
KJ_IF_MAYBE(n, next) { n->get()->prev = prev; }
Own<Task> self = kj::mv(KJ_ASSERT_NONNULL(*prev));
KJ_ASSERT(self.get() == this);
*prev = kj::mv(next);
next = nullptr;
prev = nullptr;
return self;
}
Maybe<Own<Task>> next;
Maybe<Own<Task>>* prev = nullptr;
kj::String trace() {
void* space[32];
_::TraceBuilder builder(space);
node->tracePromise(builder, false);
return kj::str("task: ", builder);
}
protected:
Maybe<Own<Event>> fire() override {
// Get the result.
_::ExceptionOr<_::Void> result;
node->get(result);
// Delete the node, catching any exceptions.
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
node = nullptr;
})) {
result.addException(kj::mv(*exception));
}
// Call the error handler if there was an exception.
KJ_IF_MAYBE(e, result.exception) {
taskSet.errorHandler.taskFailed(kj::mv(*e));
}
// Remove from the task list.
auto self = pop();
KJ_IF_MAYBE(f, taskSet.emptyFulfiller) {
if (taskSet.tasks == nullptr) {
f->get()->fulfill();
taskSet.emptyFulfiller = nullptr;
}
}
return mv(self);
}
void traceEvent(_::TraceBuilder& builder) override {
// Pointing out the ErrorHandler's taskFailed() implementation will usually identify the
// particular TaskSet that contains this event.
builder.add(_::getMethodStartAddress(taskSet.errorHandler, &ErrorHandler::taskFailed));
}
private:
TaskSet& taskSet;
Own<_::PromiseNode> node;
};
TaskSet::~TaskSet() noexcept(false) {
// You could argue it is dubious, but some applications would like for the destructor of a
// task to be able to schedule new tasks. So when we cancel our tasks... we might find new
// tasks added! We'll have to repeatedly cancel. Additionally, we need to make sure that we destroy
// the items in a loop to prevent any issues with stack overflow.
while (tasks != nullptr) {
auto removed = KJ_REQUIRE_NONNULL(tasks)->pop();
}
}
void TaskSet::add(Promise<void>&& promise) {
auto task = heap<Task>(*this, _::PromiseNode::from(kj::mv(promise)));
KJ_IF_MAYBE(head, tasks) {
head->get()->prev = &task->next;
task->next = kj::mv(tasks);
}
task->prev = &tasks;
tasks = kj::mv(task);
}
kj::String TaskSet::trace() {
kj::Vector<kj::String> traces;
Maybe<Own<Task>>* ptr = &tasks;
for (;;) {
KJ_IF_MAYBE(task, *ptr) {
traces.add(task->get()->trace());
ptr = &task->get()->next;
} else {
break;
}
}
return kj::strArray(traces, "\n");
}
Promise<void> TaskSet::onEmpty() {
KJ_IF_MAYBE(fulfiller, emptyFulfiller) {
if (fulfiller->get()->isWaiting()) {
KJ_FAIL_REQUIRE("onEmpty() can only be called once at a time");
}
}
if (tasks == nullptr) {
return READY_NOW;
} else {
auto paf = newPromiseAndFulfiller<void>();
emptyFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
}
// =======================================================================================
namespace {
#if _WIN32 || __CYGWIN__
thread_local void* threadMainFiber = nullptr;
void* getMainWin32Fiber() {
return threadMainFiber;
}
#endif
inline void ensureThreadCanRunFibers() {
#if _WIN32 || __CYGWIN__
// Make sure the current thread has been converted to a fiber.
void* fiber = threadMainFiber;
if (fiber == nullptr) {
// Thread not initialized. Convert it to a fiber now.
// Note: Unfortunately, if the application has already converted the thread to a fiber, I
// guess this will fail. But trying to call GetCurrentFiber() when the thread isn't a fiber
// doesn't work (it returns null on WINE but not on real windows, ugh). So I guess we're
// just incompatible with the application doing anything with fibers, which is sad.
threadMainFiber = fiber = ConvertThreadToFiber(nullptr);
}
#endif
}
} // namespace
namespace _ {
class FiberStack final {
// A class containing a fiber stack impl. This is separate from fiber
// promises since it lets us move the stack itself around and reuse it.
public:
FiberStack(size_t stackSize);
~FiberStack() noexcept(false);
struct SynchronousFunc {
kj::FunctionParam<void()>& func;
kj::Maybe<kj::Exception> exception;
};
void initialize(FiberBase& fiber);
void initialize(SynchronousFunc& syncFunc);
void reset() {
main = {};
}
void switchToFiber();
void switchToMain();
void trace(TraceBuilder& builder) {
// TODO(someday): Trace through fiber stack? Can it be done???
builder.add(getMethodStartAddress(*this, &FiberStack::trace));
}
private:
size_t stackSize;
OneOf<FiberBase*, SynchronousFunc*> main;
friend class FiberBase;
friend class FiberPool::Impl;
struct StartRoutine;
#if KJ_USE_FIBERS
#if _WIN32 || __CYGWIN__
void* osFiber;
#else
struct Impl;
Impl* impl;
#endif
#endif
[[noreturn]] void run();
bool isReset() { return main == nullptr; }
};
} // namespace _
#if __linux__
// TODO(someday): Support core-local freelists on OSs other than Linux. The only tricky part is
// finding what to use instead of sched_getcpu() to get the current CPU ID.
#define USE_CORE_LOCAL_FREELISTS 1
#endif
#if USE_CORE_LOCAL_FREELISTS
static const size_t CACHE_LINE_SIZE = 64;
// Most modern architectures have 64-byte cache lines.
#endif
class FiberPool::Impl final: private Disposer {
public:
Impl(size_t stackSize): stackSize(stackSize) {}
~Impl() noexcept(false) {
#if USE_CORE_LOCAL_FREELISTS
if (coreLocalFreelists != nullptr) {
KJ_DEFER(free(coreLocalFreelists));
for (uint i: kj::zeroTo(nproc)) {
for (auto stack: coreLocalFreelists[i].stacks) {
if (stack != nullptr) {
delete stack;
}
}
}
}
#endif
// Make sure we're not leaking anything from the global freelist either.
auto lock = freelist.lockExclusive();
auto dangling = kj::mv(*lock);
for (auto& stack: dangling) {
delete stack;
}
}
void setMaxFreelist(size_t count) {
maxFreelist = count;
}
size_t getFreelistSize() const {
return freelist.lockShared()->size();
}
void useCoreLocalFreelists() {
#if USE_CORE_LOCAL_FREELISTS
if (coreLocalFreelists != nullptr) {
// Ignore repeat call.
return;
}
int nproc_;
KJ_SYSCALL(nproc_ = sysconf(_SC_NPROCESSORS_CONF));
nproc = nproc_;
void* allocPtr;
size_t totalSize = nproc * sizeof(CoreLocalFreelist);
int error = posix_memalign(&allocPtr, CACHE_LINE_SIZE, totalSize);
if (error != 0) {
KJ_FAIL_SYSCALL("posix_memalign", error);
}
memset(allocPtr, 0, totalSize);
coreLocalFreelists = reinterpret_cast<CoreLocalFreelist*>(allocPtr);
#endif
}
Own<_::FiberStack> takeStack() const {
// Get a stack from the pool. The disposer on the returned Own pointer will return the stack
// to the pool, provided that reset() has been called to indicate that the stack is not in
// a weird state.
#if USE_CORE_LOCAL_FREELISTS
KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) {
for (auto& stackPtr: core->stacks) {
_::FiberStack* result = __atomic_exchange_n(&stackPtr, nullptr, __ATOMIC_ACQUIRE);
if (result != nullptr) {
// Found a stack in this slot!
return { result, *this };
}
}
// No stacks found, fall back to global freelist.
}
#endif
{
auto lock = freelist.lockExclusive();
if (!lock->empty()) {
_::FiberStack* result = lock->back();
lock->pop_back();
return { result, *this };
}
}
_::FiberStack* result = new _::FiberStack(stackSize);
return { result, *this };
}
private:
size_t stackSize;
size_t maxFreelist = kj::maxValue;
MutexGuarded<std::deque<_::FiberStack*>> freelist;
#if USE_CORE_LOCAL_FREELISTS
struct CoreLocalFreelist {
union {
_::FiberStack* stacks[2];
// For now, we don't try to freelist more than 2 stacks per core. If you have three or more
// threads interleaved on a core, chances are you have bigger problems...
byte padToCacheLine[CACHE_LINE_SIZE];
// We don't want two core-local freelists to live in the same cache line, otherwise the
// cores will fight over ownership of that line.
};
};
uint nproc;
CoreLocalFreelist* coreLocalFreelists = nullptr;
kj::Maybe<CoreLocalFreelist&> lookupCoreLocalFreelist() const {
if (coreLocalFreelists == nullptr) {
return nullptr;
} else {
int cpu = sched_getcpu();
if (cpu >= 0) {
// TODO(perf): Perhaps two hyperthreads on the same physical core should share a freelist?
// But I don't know how to find out if the system uses hyperthreading.
return coreLocalFreelists[cpu];
} else {
static bool logged = false;
if (!logged) {
KJ_LOG(ERROR, "invalid cpu number from sched_getcpu()?", cpu, nproc);
logged = true;
}
return nullptr;
}
}
}
#endif
void disposeImpl(void* pointer) const {
_::FiberStack* stack = reinterpret_cast<_::FiberStack*>(pointer);
KJ_DEFER(delete stack);
// Verify that the stack was reset before returning, otherwise it might be in a weird state
// where we don't want to reuse it.
if (stack->isReset()) {
#if USE_CORE_LOCAL_FREELISTS
KJ_IF_MAYBE(core, lookupCoreLocalFreelist()) {
for (auto& stackPtr: core->stacks) {
stack = __atomic_exchange_n(&stackPtr, stack, __ATOMIC_RELEASE);
if (stack == nullptr) {
// Cool, we inserted the stack into an unused slot. We're done.
return;
}
}
// All slots were occupied, so we inserted the new stack in the front, pushed the rest back,
// and now `stack` refers to the stack that fell off the end of the core-local list. That
// needs to go into the global freelist.
}
#endif
auto lock = freelist.lockExclusive();
lock->push_back(stack);
if (lock->size() > maxFreelist) {
stack = lock->front();
lock->pop_front();
} else {
stack = nullptr;
}
}
}
};
FiberPool::FiberPool(size_t stackSize) : impl(kj::heap<FiberPool::Impl>(stackSize)) {}
FiberPool::~FiberPool() noexcept(false) {}
void FiberPool::setMaxFreelist(size_t count) {
impl->setMaxFreelist(count);
}
size_t FiberPool::getFreelistSize() const {
return impl->getFreelistSize();
}
void FiberPool::useCoreLocalFreelists() {
impl->useCoreLocalFreelists();
}
void FiberPool::runSynchronously(kj::FunctionParam<void()> func) const {
ensureThreadCanRunFibers();
_::FiberStack::SynchronousFunc syncFunc { func, nullptr };
{
auto stack = impl->takeStack();
stack->initialize(syncFunc);
stack->switchToFiber();
stack->reset(); // safe to reuse
}
KJ_IF_MAYBE(e, syncFunc.exception) {
kj::throwRecoverableException(kj::mv(*e));
}
}
namespace _ { // private
class LoggingErrorHandler: public TaskSet::ErrorHandler {
public:
static LoggingErrorHandler instance;
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
}
};
LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();
} // namespace _ (private)
// =======================================================================================
struct Executor::Impl {
Impl(EventLoop& loop): state(loop) {}
struct State {
// Queues of notifications from other threads that need this thread's attention.
State(EventLoop& loop): loop(loop) {}
kj::Maybe<EventLoop&> loop;
// Becomes null when the loop is destroyed.
List<_::XThreadEvent, &_::XThreadEvent::targetLink> start;
List<_::XThreadEvent, &_::XThreadEvent::targetLink> cancel;
List<_::XThreadEvent, &_::XThreadEvent::replyLink> replies;
// Lists of events that need actioning by this thread.
List<_::XThreadEvent, &_::XThreadEvent::targetLink> executing;
// Events that have already been dispatched and are happily executing. This list is maintained
// so that they can be canceled if the event loop exits.
List<_::XThreadPaf, &_::XThreadPaf::link> fulfilled;
// Set of XThreadPafs that have been fulfilled by another thread.
bool waitingForCancel = false;
// True if this thread is currently blocked waiting for some other thread to pump its
// cancellation queue. If that other thread tries to block on *this* thread, then it could
// deadlock -- it must take precautions against this.
bool isDispatchNeeded() const {
return !start.empty() || !cancel.empty() || !replies.empty() || !fulfilled.empty();
}
void dispatchAll(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
for (auto& event: start) {
start.remove(event);
executing.add(event);
event.state = _::XThreadEvent::EXECUTING;
event.armBreadthFirst();
}
dispatchCancels(eventsToCancelOutsideLock);
for (auto& event: replies) {
replies.remove(event);
event.onReadyEvent.armBreadthFirst();
}
for (auto& event: fulfilled) {
fulfilled.remove(event);
event.state = _::XThreadPaf::DISPATCHED;
event.onReadyEvent.armBreadthFirst();
}
}
void dispatchCancels(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
for (auto& event: cancel) {
cancel.remove(event);
if (event.promiseNode == nullptr) {
event.setDoneState();
} else {
// We can't destroy the promiseNode while the mutex is locked, because we don't know
// what the destructor might do. But, we *must* destroy it before acknowledging
// cancellation. So we have to add it to a list to destroy later.
eventsToCancelOutsideLock.add(&event);
}
}
}
};
kj::MutexGuarded<State> state;
// After modifying state from another thread, the loop's port.wake() must be called.
void processAsyncCancellations(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
// After calling dispatchAll() or dispatchCancels() with the lock held, it may be that some
// cancellations require dropping the lock before destroying the promiseNode. In that case
// those cancellations will be added to the eventsToCancelOutsideLock Vector passed to the
// method. That vector must then be passed to processAsyncCancellations() as soon as the lock
// is released.
for (auto& event: eventsToCancelOutsideLock) {
event->promiseNode = nullptr;
event->disarm();
}
// Now we need to mark all the events "done" under lock.
auto lock = state.lockExclusive();
for (auto& event: eventsToCancelOutsideLock) {
event->setDoneState();
}
}
void disconnect() {
state.lockExclusive()->loop = nullptr;
// Now that `loop` is set null in `state`, other threads will no longer try to manipulate our
// lists, so we can access them without a lock. That's convenient because a bunch of the things
// we want to do with them would require dropping the lock to avoid deadlocks. We'd end up
// copying all the lists over into separate vectors first, dropping the lock, operating on
// them, and then locking again.
auto& s = state.getWithoutLock();
// We do, however, take and release the lock on the way out, to make sure anyone performing
// a conditional wait for state changes gets a chance to have their wait condition re-checked.
KJ_DEFER(state.lockExclusive());
for (auto& event: s.start) {
KJ_ASSERT(event.state == _::XThreadEvent::QUEUED, event.state) { break; }
s.start.remove(event);
event.setDisconnected();
event.sendReply();
event.setDoneState();
}
for (auto& event: s.executing) {
KJ_ASSERT(event.state == _::XThreadEvent::EXECUTING, event.state) { break; }
s.executing.remove(event);
event.promiseNode = nullptr;
event.setDisconnected();
event.sendReply();
event.setDoneState();
}
for (auto& event: s.cancel) {
KJ_ASSERT(event.state == _::XThreadEvent::CANCELING, event.state) { break; }
s.cancel.remove(event);
event.promiseNode = nullptr;
event.setDoneState();
}
// The replies list "should" be empty, because any locally-initiated tasks should have been
// canceled before destroying the EventLoop.
if (!s.replies.empty()) {
KJ_LOG(ERROR, "EventLoop destroyed with cross-thread event replies outstanding");
for (auto& event: s.replies) {
s.replies.remove(event);
}
}
// Similarly for cross-thread fulfillers. The waiting tasks should have been canceled.
if (!s.fulfilled.empty()) {
KJ_LOG(ERROR, "EventLoop destroyed with cross-thread fulfiller replies outstanding");
for (auto& event: s.fulfilled) {
s.fulfilled.remove(event);
event.state = _::XThreadPaf::DISPATCHED;
}
}
}};
namespace _ { // (private)
XThreadEvent::XThreadEvent(
ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr,
SourceLocation location)
: Event(targetExecutor.getLoop(), location), result(result), funcTracePtr(funcTracePtr),
targetExecutor(targetExecutor.addRef()) {}
void XThreadEvent::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// We can't safely trace into another thread, so we'll stop here.
builder.add(funcTracePtr);
}
void XThreadEvent::ensureDoneOrCanceled() {
if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
auto lock = targetExecutor->impl->state.lockExclusive();
const EventLoop* loop;
KJ_IF_MAYBE(l, lock->loop) {
loop = l;
} else {
// Target event loop is already dead, so we know it's already working on transitioning all
// events to the DONE state. We can just wait.
lock.wait([&](auto&) { return state == DONE; });
return;
}
switch (state) {
case UNUSED:
// Nothing to do.
break;
case QUEUED:
lock->start.remove(*this);
// No wake needed since we removed work rather than adding it.
state = DONE;
break;
case EXECUTING: {
lock->executing.remove(*this);
lock->cancel.add(*this);
state = CANCELING;
KJ_IF_MAYBE(p, loop->port) {
p->wake();
}
Maybe<Executor&> maybeSelfExecutor = nullptr;
if (threadLocalEventLoop != nullptr) {
KJ_IF_MAYBE(e, threadLocalEventLoop->executor) {
maybeSelfExecutor = **e;
}
}
KJ_IF_MAYBE(selfExecutor, maybeSelfExecutor) {
// If, while waiting for other threads to process our cancellation request, we have
// cancellation requests queued back to this thread, we must process them. Otherwise,
// we could deadlock with two threads waiting on each other to process cancellations.
//
// We don't have a terribly good way to detect this, except to check if the remote
// thread is itself waiting for cancellations and, if so, wake ourselves up to check for
// cancellations to process. This will busy-loop but at least it should eventually
// resolve assuming fair scheduling.
//
// To make things extra-annoying, in order to update our waitingForCancel flag, we have
// to lock our own executor state, but we can't take both locks at once, so we have to
// release the other lock in the meantime.
// Make sure we unset waitingForCancel on the way out.
KJ_DEFER({
lock = {};
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = false;
selfLock->dispatchCancels(eventsToCancelOutsideLock);
// We don't need to re-take the lock on the other executor here; it's not used again
// after this scope.
});
while (state != DONE) {
bool otherThreadIsWaiting = lock->waitingForCancel;
// Make sure our waitingForCancel is on and dispatch any pending cancellations on this
// thread.
lock = {};
{
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = true;
// Note that we don't have to proactively delete the PromiseNodes extracted from
// the canceled events because those nodes belong to this thread and can't possibly
// continue executing while we're blocked here.
selfLock->dispatchCancels(eventsToCancelOutsideLock);
}
if (otherThreadIsWaiting) {
// We know the other thread was waiting for cancellations to complete a moment ago.
// We may have just processed the necessary cancellations in this thread, in which
// case the other thread needs a chance to receive control and notice this. Or, it
// may be that the other thread is waiting for some third thread to take action.
// Either way, we should yield control here to give things a chance to settle.
// Otherwise we could end up in a tight busy loop.
#if _WIN32
Sleep(0);
#else
sched_yield();
#endif
}
// OK now we can take the original lock again.
lock = targetExecutor->impl->state.lockExclusive();
// OK, now we can wait for the other thread to either process our cancellation or
// indicate that it is waiting for remote cancellation.
lock.wait([&](const Executor::Impl::State& executorState) {
return state == DONE || executorState.waitingForCancel;
});
}
} else {
// We have no executor of our own so we don't have to worry about cancellation cycles
// causing deadlock.
//
// NOTE: I don't think we can actually get here, because it implies that this is a
// synchronous execution, which means there's no way to cancel it.
lock.wait([&](auto&) { return state == DONE; });
}
KJ_DASSERT(!targetLink.isLinked());
break;
}
case CANCELING:
KJ_FAIL_ASSERT("impossible state: CANCELING should only be set within the above case");
case DONE:
// Became done while we waited for lock. Nothing to do.
break;
}
}
KJ_IF_MAYBE(e, replyExecutor) {
// Since we know we reached the DONE state (or never left UNUSED), we know that the remote
// thread is all done playing with our `replyPrev` pointer. Only the current thread could
// possibly modify it after this point. So we can skip the lock if it's already null.
if (replyLink.isLinked()) {
auto lock = e->impl->state.lockExclusive();
lock->replies.remove(*this);
}
}
}
void XThreadEvent::sendReply() {
KJ_IF_MAYBE(e, replyExecutor) {
// Queue the reply.
const EventLoop* replyLoop;
{
auto lock = e->impl->state.lockExclusive();
KJ_IF_MAYBE(l, lock->loop) {
lock->replies.add(*this);
replyLoop = l;
} else {
// Calling thread exited without cancelling the promise. This is UB. In fact,
// `replyExecutor` is probably already destroyed and we are in use-after-free territory
// already. Better abort.
KJ_LOG(FATAL,
"the thread which called kj::Executor::executeAsync() apparently exited its own "
"event loop without canceling the cross-thread promise first; this is undefined "
"behavior so I will crash now");
abort();
}
}
// Note that it's safe to assume `replyLoop` still exists even though we dropped the lock
// because that thread would have had to cancel any promises before destroying its own
// EventLoop, and when it tries to destroy this promise, it will wait for `state` to become
// `DONE`, which we don't set until later on. That's nice because wake() probably makes a
// syscall and we'd rather not hold the lock through syscalls.
KJ_IF_MAYBE(p, replyLoop->port) {
p->wake();
}
}
}
void XThreadEvent::done() {
KJ_ASSERT(targetExecutor.get() == &currentEventLoop().getExecutor(),
"calling done() from wrong thread?");
sendReply();
{
auto lock = targetExecutor->impl->state.lockExclusive();
switch (state) {
case EXECUTING:
lock->executing.remove(*this);
break;
case CANCELING:
// Sending thread requested cancellation, but we're done anyway, so it doesn't matter at this
// point.
lock->cancel.remove(*this);
break;
default:
KJ_FAIL_ASSERT("can't call done() from this state", (uint)state);
}
setDoneState();
}
}
inline void XThreadEvent::setDoneState() {
__atomic_store_n(&state, DONE, __ATOMIC_RELEASE);
}
void XThreadEvent::setDisconnected() {
result.addException(KJ_EXCEPTION(DISCONNECTED,
"Executor's event loop exited before cross-thread event could complete"));
}
class XThreadEvent::DelayedDoneHack: public Disposer {
// Crazy hack: In fire(), we want to call done() if the event is finished. But done() signals
// the requesting thread to wake up and possibly delete the XThreadEvent. But the caller (the
// EventLoop) still has to set `event->firing = false` after `fire()` returns, so this would be
// a race condition use-after-free.
//
// It just so happens, though, that fire() is allowed to return an optional `Own<Event>` to drop,
// and the caller drops that pointer immediately after setting event->firing = false. So we
// return a pointer whose disposer calls done().
//
// It's not quite as much of a hack as it seems: The whole reason fire() returns an Own<Event> is
// so that the event can delete itself, but do so after the caller sets event->firing = false.
// It just happens to be that in this case, the event isn't deleting itself, but rather releasing
// itself back to the other thread.
protected:
void disposeImpl(void* pointer) const override {
reinterpret_cast<XThreadEvent*>(pointer)->done();
}
};
Maybe<Own<Event>> XThreadEvent::fire() {
static constexpr DelayedDoneHack DISPOSER {};
KJ_IF_MAYBE(n, promiseNode) {
n->get()->get(result);
promiseNode = nullptr; // make sure to destroy in the thread that created it
return Own<Event>(this, DISPOSER);
} else {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
promiseNode = execute();
})) {
result.addException(kj::mv(*exception));
};
KJ_IF_MAYBE(n, promiseNode) {
n->get()->onReady(this);
} else {
return Own<Event>(this, DISPOSER);
}
}
return nullptr;
}
void XThreadEvent::traceEvent(TraceBuilder& builder) {
KJ_IF_MAYBE(n, promiseNode) {
n->get()->tracePromise(builder, true);
}
// We can't safely trace into another thread, so we'll stop here.
builder.add(funcTracePtr);
}
void XThreadEvent::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
XThreadPaf::XThreadPaf()
: state(WAITING), executor(getCurrentThreadExecutor()) {}
XThreadPaf::~XThreadPaf() noexcept(false) {}
void XThreadPaf::Disposer::disposeImpl(void* pointer) const {
XThreadPaf* obj = reinterpret_cast<XThreadPaf*>(pointer);
auto oldState = WAITING;
if (__atomic_load_n(&obj->state, __ATOMIC_ACQUIRE) == DISPATCHED) {
// Common case: Promise was fully fulfilled and dispatched, no need for locking.
delete obj;
} else if (__atomic_compare_exchange_n(&obj->state, &oldState, CANCELED, false,
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// State transitioned from WAITING to CANCELED, so now it's the fulfiller's job to destroy the
// object.
} else {
// Whoops, another thread is already in the process of fulfilling this promise. We'll have to
// wait for it to finish and transition the state to FULFILLED.
obj->executor.impl->state.when([&](auto&) {
return obj->state == FULFILLED || obj->state == DISPATCHED;
}, [&](Executor::Impl::State& exState) {
if (obj->state == FULFILLED) {
// The object is on the queue but was not yet dispatched. Remove it.
exState.fulfilled.remove(*obj);
}
});
// It's ours now, delete it.
delete obj;
}
}
const XThreadPaf::Disposer XThreadPaf::DISPOSER;
void XThreadPaf::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void XThreadPaf::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// We can't safely trace into another thread, so we'll stop here.
// Maybe returning the address of get() will give us a function name with meaningful type
// information.
builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
}
XThreadPaf::FulfillScope::FulfillScope(XThreadPaf** pointer) {
obj = __atomic_exchange_n(pointer, static_cast<XThreadPaf*>(nullptr), __ATOMIC_ACQUIRE);
auto oldState = WAITING;
if (obj == nullptr) {
// Already fulfilled (possibly by another thread).
} else if (__atomic_compare_exchange_n(&obj->state, &oldState, FULFILLING, false,
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// Transitioned to FULFILLING, good.
} else {
// The waiting thread must have canceled.
KJ_ASSERT(oldState == CANCELED);
// It's our responsibility to clean up, then.
delete obj;
// Set `obj` null so that we don't try to fill it in or delete it later.
obj = nullptr;
}
}
XThreadPaf::FulfillScope::~FulfillScope() noexcept(false) {
if (obj != nullptr) {
auto lock = obj->executor.impl->state.lockExclusive();
KJ_IF_MAYBE(l, lock->loop) {
lock->fulfilled.add(*obj);
__atomic_store_n(&obj->state, FULFILLED, __ATOMIC_RELEASE);
KJ_IF_MAYBE(p, l->port) {
// TODO(perf): It's annoying we have to call wake() with the lock held, but we have to
// prevent the destination EventLoop from being destroyed first.
p->wake();
}
} else {
KJ_LOG(FATAL,
"the thread which called kj::newPromiseAndCrossThreadFulfiller<T>() apparently exited "
"its own event loop without canceling the cross-thread promise first; this is "
"undefined behavior so I will crash now");
abort();
}
}
}
kj::Exception XThreadPaf::unfulfilledException() {
// TODO(cleanup): Share code with regular PromiseAndFulfiller for stack tracing here.
return kj::Exception(kj::Exception::Type::FAILED, __FILE__, __LINE__, kj::heapString(
"cross-thread PromiseFulfiller was destroyed without fulfilling the promise."));
}
class ExecutorImpl: public Executor, public AtomicRefcounted {
public:
using Executor::Executor;
kj::Own<const Executor> addRef() const override {
return kj::atomicAddRef(*this);
}
};
} // namespace _
Executor::Executor(EventLoop& loop, Badge<EventLoop>): impl(kj::heap<Impl>(loop)) {}
Executor::~Executor() noexcept(false) {}
bool Executor::isLive() const {
return impl->state.lockShared()->loop != nullptr;
}
void Executor::send(_::XThreadEvent& event, bool sync) const {
KJ_ASSERT(event.state == _::XThreadEvent::UNUSED);
if (sync) {
EventLoop* thisThread = threadLocalEventLoop;
if (thisThread != nullptr &&
thisThread->executor.map([this](auto& e) { return e == this; }).orDefault(false)) {
// Invoking a sync request on our own thread. Just execute it directly; if we try to queue
// it to the loop, we'll deadlock.
auto promiseNode = event.execute();
// If the function returns a promise, we have no way to pump the event loop to wait for it,
// because the event loop may already be pumping somewhere up the stack.
KJ_ASSERT(promiseNode == nullptr,
"can't call executeSync() on own thread's executor with a promise-returning function");
return;
}
} else {
event.replyExecutor = getCurrentThreadExecutor();
// Note that async requests will "just work" even if the target executor is our own thread's
// executor. In theory we could detect this case to avoid some locking and signals but that
// would be extra code complexity for probably little benefit.
}
auto lock = impl->state.lockExclusive();
const EventLoop* loop;
KJ_IF_MAYBE(l, lock->loop) {
loop = l;
} else {
event.setDisconnected();
return;
}
event.state = _::XThreadEvent::QUEUED;
lock->start.add(event);
KJ_IF_MAYBE(p, loop->port) {
p->wake();
} else {
// Event loop will be waiting on executor.wait(), which will be woken when we unlock the mutex.
}
if (sync) {
lock.wait([&](auto&) { return event.state == _::XThreadEvent::DONE; });
}
}
void Executor::wait() {
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive();
lock.wait([](const Impl::State& state) {
return state.isDispatchNeeded();
});
lock->dispatchAll(eventsToCancelOutsideLock);
}
bool Executor::poll() {
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive();
if (lock->isDispatchNeeded()) {
lock->dispatchAll(eventsToCancelOutsideLock);
return true;
} else {
return false;
}
}
EventLoop& Executor::getLoop() const {
KJ_IF_MAYBE(l, impl->state.lockShared()->loop) {
return *l;
} else {
kj::throwFatalException(KJ_EXCEPTION(DISCONNECTED, "Executor's event loop has exited"));
}
}
const Executor& getCurrentThreadExecutor() {
return currentEventLoop().getExecutor();
}
// =======================================================================================
// Fiber implementation.
namespace _ { // private
#if KJ_USE_FIBERS
#if !(_WIN32 || __CYGWIN__)
struct FiberStack::Impl {
// This struct serves two purposes:
// - It contains OS-specific state that we don't want to declare in the header.
// - It is allocated at the top of the fiber's stack area, so the Impl pointer also serves to
// track where the stack was allocated.
jmp_buf fiberJmpBuf;
jmp_buf originalJmpBuf;
#if KJ_HAS_COMPILER_FEATURE(address_sanitizer)
// Stuff that we need to pass to __sanitizer_start_switch_fiber() /
// __sanitizer_finish_switch_fiber() when using ASAN.
void* originalFakeStack = nullptr;
void* fiberFakeStack = nullptr;
// Pointer to ASAN "fake stack" associated with the fiber and its calling stack. Filled in by
// __sanitizer_start_switch_fiber() before switching away, consumed by
// __sanitizer_finish_switch_fiber() upon switching back.
void const* originalBottom;
size_t originalSize;
// Size and location of the original stack before switching fibers. These are filled in by
// __sanitizer_finish_switch_fiber() after the switch, and must be passed to
// __sanitizer_start_switch_fiber() when switching back later.
#endif
static Impl* alloc(size_t stackSize, ucontext_t* context) {
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
#ifndef MAP_STACK
#define MAP_STACK 0
#endif
size_t pageSize = getPageSize();
size_t allocSize = stackSize + pageSize; // size plus guard page and impl
// Allocate virtual address space for the stack but make it inaccessible initially.
// TODO(someday): Does it make sense to use MAP_GROWSDOWN on Linux? It's a kind of bizarre flag
// that causes the mapping to automatically allocate extra pages (beyond the range specified)
// until it hits something...
void* stackMapping = mmap(nullptr, allocSize, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0);
if (stackMapping == MAP_FAILED) {
KJ_FAIL_SYSCALL("mmap(new stack)", errno);
}
KJ_ON_SCOPE_FAILURE({
KJ_SYSCALL(munmap(stackMapping, allocSize)) { break; }
});
void* stack = reinterpret_cast<byte*>(stackMapping) + pageSize;
// Now mark everything except the guard page as read-write. We assume the stack grows down, so
// the guard page is at the beginning. No modern architecture uses stacks that grow up.
KJ_SYSCALL(mprotect(stack, stackSize, PROT_READ | PROT_WRITE));
// Stick `Impl` at the top of the stack.
Impl* impl = (reinterpret_cast<Impl*>(reinterpret_cast<byte*>(stack) + stackSize) - 1);
// Note: mmap() allocates zero'd pages so we don't have to memset() anything here.
KJ_SYSCALL(getcontext(context));
#if __APPLE__ && __aarch64__
// Per issue #1386, apple on arm64 zeros the entire configured stack.
// But this is redundant, since we just allocated the stack with mmap() which
// returns zero'd pages. Re-zeroing is both slow and results in prematurely
// allocating pages we may not need -- it's normal for stacks to rely heavily
// on lazy page allocation to avoid wasting memory. Instead, we lie:
// we allocate the full size, but tell the ucontext the stack is the last
// page only. This appears to work as no particular bounds checks or
// anything are set up based on what we say here.
context->uc_stack.ss_size = min(pageSize, stackSize) - sizeof(Impl);
context->uc_stack.ss_sp = reinterpret_cast<char*>(stack) + stackSize - min(pageSize, stackSize);
#else
context->uc_stack.ss_size = stackSize - sizeof(Impl);
context->uc_stack.ss_sp = reinterpret_cast<char*>(stack);
#endif
context->uc_stack.ss_flags = 0;
// We don't use uc_link since our fiber start routine runs forever in a loop to allow for
// reuse. When we're done with the fiber, we just destroy it, without switching to it's
// stack. This is safe since the start routine doesn't allocate any memory or RAII objects
// before looping.
context->uc_link = 0;
return impl;
}
static void free(Impl* impl, size_t stackSize) {
size_t allocSize = stackSize + getPageSize();
void* stack = reinterpret_cast<byte*>(impl + 1) - allocSize;
KJ_SYSCALL(munmap(stack, allocSize)) { break; }
}
static size_t getPageSize() {
#ifndef _SC_PAGESIZE
#define _SC_PAGESIZE _SC_PAGE_SIZE
#endif
static size_t result = sysconf(_SC_PAGESIZE);
return result;
}
};
#endif
#endif
struct FiberStack::StartRoutine {
#if _WIN32 || __CYGWIN__
static void WINAPI run(LPVOID ptr) {
// This is the static C-style function we pass to CreateFiber().
reinterpret_cast<FiberStack*>(ptr)->run();
}
#else
[[noreturn]] static void run(int arg1, int arg2) {
// This is the static C-style function we pass to makeContext().
// POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
// work correctly on 64-bit machines. Gross.
uintptr_t ptr = static_cast<uint>(arg1);
ptr |= static_cast<uintptr_t>(static_cast<uint>(arg2)) << (sizeof(ptr) * 4);
auto& stack = *reinterpret_cast<FiberStack*>(ptr);
__sanitizer_finish_switch_fiber(nullptr,
&stack.impl->originalBottom, &stack.impl->originalSize);
// We first switch to the fiber inside of the FiberStack constructor. This is just for
// initialization purposes, and we're expected to switch back immediately.
stack.switchToMain();
// OK now have a real job.
stack.run();
}
#endif
};
void FiberStack::run() {
// Loop forever so that the fiber can be reused.
for (;;) {
KJ_SWITCH_ONEOF(main) {
KJ_CASE_ONEOF(event, FiberBase*) {
event->run();
}
KJ_CASE_ONEOF(func, SynchronousFunc*) {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions(func->func)) {
func->exception.emplace(kj::mv(*exception));
}
}
}
// Wait for the fiber to be used again. Note the fiber might simply be destroyed without this
// ever returning. That's OK because we don't have any nontrivial destructors on the stack
// at this point.
switchToMain();
}
}
FiberStack::FiberStack(size_t stackSizeParam)
// Force stackSize to a reasonable minimum.
: stackSize(kj::max(stackSizeParam, 65536))
{
#if KJ_USE_FIBERS
#if _WIN32 || __CYGWIN__
// We can create fibers before we convert the main thread into a fiber in FiberBase
KJ_WIN32(osFiber = CreateFiber(stackSize, &StartRoutine::run, this));
#else
// Note: Nothing below here can throw. If that changes then we need to call Impl::free(impl)
// on exceptions...
ucontext_t context;
impl = Impl::alloc(stackSize, &context);
// POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
// work correctly on 64-bit machines. Gross.
uintptr_t ptr = reinterpret_cast<uintptr_t>(this);
int arg1 = ptr & ((uintptr_t(1) << (sizeof(ptr) * 4)) - 1);
int arg2 = ptr >> (sizeof(ptr) * 4);
makecontext(&context, reinterpret_cast<void(*)()>(&StartRoutine::run), 2, arg1, arg2);
__sanitizer_start_switch_fiber(&impl->originalFakeStack, impl, stackSize - sizeof(Impl));
if (_setjmp(impl->originalJmpBuf) == 0) {
setcontext(&context);
}
__sanitizer_finish_switch_fiber(impl->originalFakeStack, nullptr, nullptr);
#endif
#else
#if KJ_NO_EXCEPTIONS
KJ_UNIMPLEMENTED("Fibers are not implemented because exceptions are disabled");
#else
KJ_UNIMPLEMENTED(
"Fibers are not implemented on this platform because its C library lacks setcontext() "
"and friends. If you'd like to see fiber support added, file a bug to let us know. "
"We can likely make it happen using assembly, but didn't want to try unless it was "
"actually needed.");
#endif
#endif
}
FiberStack::~FiberStack() noexcept(false) {
#if KJ_USE_FIBERS
#if _WIN32 || __CYGWIN__
DeleteFiber(osFiber);
#else
Impl::free(impl, stackSize);
#endif
#endif
}
void FiberStack::initialize(FiberBase& fiber) {
KJ_REQUIRE(this->main == nullptr);
this->main = &fiber;
}
void FiberStack::initialize(SynchronousFunc& func) {
KJ_REQUIRE(this->main == nullptr);
this->main = &func;
}
FiberBase::FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location)
: Event(location), state(WAITING), stack(kj::heap<FiberStack>(stackSize)), result(result) {
stack->initialize(*this);
ensureThreadCanRunFibers();
}
FiberBase::FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location)
: Event(location), state(WAITING), result(result) {
stack = pool.impl->takeStack();
stack->initialize(*this);
ensureThreadCanRunFibers();
}
FiberBase::~FiberBase() noexcept(false) {}
void FiberBase::destroy() {
// Called by `~Fiber()` to begin teardown. We can't do this work in `~FiberBase()` because the
// `Fiber` subclass contains members that may still be in-use until the fiber stops.
switch (state) {
case WAITING:
// We can't just free the stack while the fiber is running. We need to force it to execute
// until finished, so we cause it to throw an exception.
state = CANCELED;
stack->switchToFiber();
// The fiber should only switch back to the main stack on completion, because any further
// calls to wait() would throw before trying to switch.
KJ_ASSERT(state == FINISHED);
// The fiber shut down properly so the stack is safe to reuse.
stack->reset();
break;
case RUNNING:
case CANCELED:
// Bad news.
KJ_LOG(FATAL, "fiber tried to destroy itself");
::abort();
break;
case FINISHED:
// Normal completion, yay.
stack->reset();
break;
}
}
Maybe<Own<Event>> FiberBase::fire() {
KJ_ASSERT(state == WAITING);
state = RUNNING;
stack->switchToFiber();
return nullptr;
}
void FiberStack::switchToFiber() {
// Switch from the main stack to the fiber. Returns once the fiber either calls switchToMain()
// or returns from its main function.
#if KJ_USE_FIBERS
#if _WIN32 || __CYGWIN__
SwitchToFiber(osFiber);
#else
__sanitizer_start_switch_fiber(&impl->originalFakeStack, impl, stackSize - sizeof(Impl));
if (_setjmp(impl->originalJmpBuf) == 0) {
_longjmp(impl->fiberJmpBuf, 1);
}
__sanitizer_finish_switch_fiber(impl->originalFakeStack, nullptr, nullptr);
#endif
#endif
}
void FiberStack::switchToMain() {
// Switch from the fiber to the main stack. Returns the next time the main stack calls
// switchToFiber().
#if KJ_USE_FIBERS
#if _WIN32 || __CYGWIN__
SwitchToFiber(getMainWin32Fiber());
#else
// TODO(someady): In theory, the last time we switch away from the fiber, we should pass `nullptr`
// for the first argument here, so that ASAN destroys the fake stack. However, as currently
// designed, we don't actually know if we're switching away for the last time. It's understood
// that when we call switchToMain() in FiberStack::run(), then the main stack is allowed to
// destroy the fiber, or reuse it. I don't want to develop a mechanism to switch back to the
// fiber on final destruction just to get the hints right, so instead we leak the fake stack.
// This doesn't seem to cause any problems -- it's not even detected by ASAN as a memory leak.
// But if we wanted to run ASAN builds in production or something, it might be an issue.
__sanitizer_start_switch_fiber(&impl->fiberFakeStack,
impl->originalBottom, impl->originalSize);
if (_setjmp(impl->fiberJmpBuf) == 0) {
_longjmp(impl->originalJmpBuf, 1);
}
__sanitizer_finish_switch_fiber(impl->fiberFakeStack,
&impl->originalBottom, &impl->originalSize);
#endif
#endif
}
void FiberBase::run() {
#if !KJ_NO_EXCEPTIONS
bool caughtCanceled = false;
state = RUNNING;
KJ_DEFER(state = FINISHED);
WaitScope waitScope(currentEventLoop(), *this);
try {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
runImpl(waitScope);
})) {
result.addException(kj::mv(*exception));
}
} catch (CanceledException) {
if (state != CANCELED) {
// no idea who would throw this but it's not really our problem
result.addException(KJ_EXCEPTION(FAILED, "Caught CanceledException, but fiber wasn't canceled"));
}
caughtCanceled = true;
}
if (state == CANCELED && !caughtCanceled) {
KJ_LOG(ERROR, "Canceled fiber apparently caught CanceledException and didn't rethrow it. "
"Generally, applications should not catch CanceledException, but if they do, they must always rethrow.");
}
onReadyEvent.arm();
#endif
}
void FiberBase::onReady(_::Event* event) noexcept {
onReadyEvent.init(event);
}
void FiberBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
if (stopAtNextEvent) return;
currentInner->tracePromise(builder, false);
stack->trace(builder);
}
void FiberBase::traceEvent(TraceBuilder& builder) {
currentInner->tracePromise(builder, true);
stack->trace(builder);
onReadyEvent.traceEvent(builder);
}
} // namespace _ (private)
// =======================================================================================
void EventPort::setRunnable(bool runnable) {}
void EventPort::wake() const {
kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
"cross-thread wake() not implemented by this EventPort implementation"));
}
EventLoop::EventLoop()
: daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::EventLoop(EventPort& port)
: port(port),
daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::~EventLoop() noexcept(false) {
// Destroy all "daemon" tasks, noting that their destructors might register more daemon tasks.
while (!daemons->isEmpty()) {
auto oldDaemons = kj::mv(daemons);
daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance);
}
daemons = nullptr;
KJ_IF_MAYBE(e, executor) {
// Cancel all outstanding cross-thread events.
e->get()->impl->disconnect();
}
// The application _should_ destroy everything using the EventLoop before destroying the
// EventLoop itself, so if there are events on the loop, this indicates a memory leak.
KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue. Memory leak?",
head->traceEvent()) {
// Unlink all the events and hope that no one ever fires them...
_::Event* event = head;
while (event != nullptr) {
_::Event* next = event->next;
event->next = nullptr;
event->prev = nullptr;
event = next;
}
break;
}
KJ_REQUIRE(threadLocalEventLoop != this,
"EventLoop destroyed while still current for the thread.") {
threadLocalEventLoop = nullptr;
break;
}
}
void EventLoop::run(uint maxTurnCount) {
running = true;
KJ_DEFER(running = false);
for (uint i = 0; i < maxTurnCount; i++) {
if (!turn()) {
break;
}
}
setRunnable(isRunnable());
}
bool EventLoop::turn() {
_::Event* event = head;
if (event == nullptr) {
// No events in the queue.
return false;
} else {
head = event->next;
if (head != nullptr) {
head->prev = &head;
}
depthFirstInsertPoint = &head;
if (breadthFirstInsertPoint == &event->next) {
breadthFirstInsertPoint = &head;
}
if (tail == &event->next) {
tail = &head;
}
event->next = nullptr;
event->prev = nullptr;
Maybe<Own<_::Event>> eventToDestroy;
{
event->firing = true;
KJ_DEFER(event->firing = false);
currentlyFiring = event;
KJ_DEFER(currentlyFiring = nullptr);
eventToDestroy = event->fire();
}
depthFirstInsertPoint = &head;
return true;
}
}
bool EventLoop::isRunnable() {
return head != nullptr;
}
const Executor& EventLoop::getExecutor() {
KJ_IF_MAYBE(e, executor) {
return **e;
} else {
return *executor.emplace(kj::atomicRefcounted<_::ExecutorImpl>(*this, Badge<EventLoop>()));
}
}
void EventLoop::setRunnable(bool runnable) {
if (runnable != lastRunnableState) {
KJ_IF_MAYBE(p, port) {
p->setRunnable(runnable);
}
lastRunnableState = runnable;
}
}
void EventLoop::enterScope() {
KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop.");
threadLocalEventLoop = this;
}
void EventLoop::leaveScope() {
KJ_REQUIRE(threadLocalEventLoop == this,
"WaitScope destroyed in a different thread than it was created in.") {
break;
}
threadLocalEventLoop = nullptr;
}
void EventLoop::wait() {
KJ_IF_MAYBE(p, port) {
if (p->wait()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->get()->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->get()->wait();
} else {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
}
void EventLoop::poll() {
KJ_IF_MAYBE(p, port) {
if (p->poll()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->get()->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->get()->poll();
}
}
void WaitScope::poll() {
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
loop.running = true;
KJ_DEFER(loop.running = false);
runOnStackPool([&]() {
for (;;) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.poll();
if (!loop.isRunnable()) {
// Still no events in the queue. We're done.
return;
}
}
}
});
}
void WaitScope::cancelAllDetached() {
KJ_REQUIRE(fiber == nullptr,
"can't call cancelAllDetached() on a fiber WaitScope, only top-level");
while (!loop.daemons->isEmpty()) {
auto oldDaemons = kj::mv(loop.daemons);
loop.daemons = kj::heap<TaskSet>(_::LoggingErrorHandler::instance);
// Destroying `oldDaemons` could theoretically add new ones.
}
}
namespace _ { // private
#if !KJ_NO_EXCEPTIONS
static kj::CanceledException fiberCanceledException() {
// Construct the exception to throw from wait() when the fiber has been canceled (because the
// promise returned by startFiber() was dropped before completion).
return kj::CanceledException { };
};
#endif
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope,
SourceLocation location) {
EventLoop& loop = waitScope.loop;
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
#if !KJ_NO_EXCEPTIONS
// we don't support fibers when running without exceptions, so just remove the whole block
KJ_IF_MAYBE(fiber, waitScope.fiber) {
if (fiber->state == FiberBase::CANCELED) {
throw fiberCanceledException();
}
KJ_REQUIRE(fiber->state == FiberBase::RUNNING,
"This WaitScope can only be used within the fiber that created it.");
node->setSelfPointer(&node);
node->onReady(fiber);
fiber->currentInner = node;
KJ_DEFER(fiber->currentInner = nullptr);
// Switch to the main stack to run the event loop.
fiber->state = FiberBase::WAITING;
fiber->stack->switchToMain();
// The main stack switched back to us, meaning either the event we registered with
// node->onReady() fired, or we are being canceled by FiberBase's destructor.
if (fiber->state == FiberBase::CANCELED) {
throw fiberCanceledException();
}
KJ_ASSERT(fiber->state == FiberBase::RUNNING);
} else {
#endif
KJ_REQUIRE(!loop.running, "wait() is not allowed from within event callbacks.");
RootEvent doneEvent(node, reinterpret_cast<void*>(&waitImpl), location);
node->setSelfPointer(&node);
node->onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
for (;;) {
waitScope.runOnStackPool([&]() {
uint counter = 0;
while (!doneEvent.fired) {
if (!loop.turn()) {
// No events in the queue. Wait for callback.
return;
} else if (++counter > waitScope.busyPollInterval) {
// Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll.
counter = 0;
loop.poll();
}
}
});
if (doneEvent.fired) {
break;
} else {
loop.wait();
}
}
loop.setRunnable(loop.isRunnable());
#if !KJ_NO_EXCEPTIONS
}
#endif
waitScope.runOnStackPool([&]() {
node->get(result);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
node = nullptr;
})) {
result.addException(kj::mv(*exception));
}
});
}
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location) {
EventLoop& loop = waitScope.loop;
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(waitScope.fiber == nullptr, "poll() is not supported in fibers.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
RootEvent doneEvent(&node, reinterpret_cast<void*>(&pollImpl), location);
node.onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
waitScope.runOnStackPool([&]() {
while (!doneEvent.fired) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.poll();
if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up.
node.onReady(nullptr);
loop.setRunnable(false);
break;
}
}
}
});
if (!doneEvent.fired) {
return false;
}
loop.setRunnable(loop.isRunnable());
return true;
}
Promise<void> yield() {
return _::PromiseNode::to<Promise<void>>(kj::heap<YieldPromiseNode>());
}
Promise<void> yieldHarder() {
return _::PromiseNode::to<Promise<void>>(kj::heap<YieldHarderPromiseNode>());
}
Own<PromiseNode> neverDone() {
return kj::heap<NeverDonePromiseNode>();
}
void NeverDone::wait(WaitScope& waitScope, SourceLocation location) const {
ExceptionOr<Void> dummy;
waitImpl(neverDone(), dummy, waitScope, location);
KJ_UNREACHABLE;
}
void detach(kj::Promise<void>&& promise) {
EventLoop& loop = currentEventLoop();
KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
loop.daemons->add(kj::mv(promise));
}
Event::Event(SourceLocation location)
: loop(currentEventLoop()), next(nullptr), prev(nullptr), location(location) {}
Event::Event(kj::EventLoop& loop, SourceLocation location)
: loop(loop), next(nullptr), prev(nullptr), location(location) {}
Event::~Event() noexcept(false) {
live = 0;
// Prevent compiler from eliding this store above. This line probably isn't needed because there
// are complex calls later in this destructor, and the compiler probably can't prove that they
// won't come back and examine `live`, so it won't elide the write anyway. However, an
// atomic_signal_fence is also sufficient to tell the compiler that a signal handler might access
// `live`, so it won't optimize away the write. Note that a signal fence does not produce
// any instructions, it just blocks compiler optimizations.
std::atomic_signal_fence(std::memory_order_acq_rel);
disarm();
KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
}
void Event::armDepthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"Executor to queue events cross-thread.");
if (live != MAGIC_LIVE_VALUE) {
([this]() noexcept {
KJ_FAIL_ASSERT("tried to arm Event after it was destroyed", location);
})();
}
if (prev == nullptr) {
next = *loop.depthFirstInsertPoint;
prev = loop.depthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
loop.depthFirstInsertPoint = &next;
if (loop.breadthFirstInsertPoint == prev) {
loop.breadthFirstInsertPoint = &next;
}
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true);
}
}
void Event::armBreadthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"Executor to queue events cross-thread.");
if (live != MAGIC_LIVE_VALUE) {
([this]() noexcept {
KJ_FAIL_ASSERT("tried to arm Event after it was destroyed", location);
})();
}
if (prev == nullptr) {
next = *loop.breadthFirstInsertPoint;
prev = loop.breadthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
loop.breadthFirstInsertPoint = &next;
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true);
}
}
void Event::armLast() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"Executor to queue events cross-thread.");
if (live != MAGIC_LIVE_VALUE) {
([this]() noexcept {
KJ_FAIL_ASSERT("tried to arm Event after it was destroyed", location);
})();
}
if (prev == nullptr) {
next = *loop.breadthFirstInsertPoint;
prev = loop.breadthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
// We don't update loop.breadthFirstInsertPoint because we want further inserts to go *before*
// this event.
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true);
}
}
bool Event::isNext() {
return loop.running && loop.head == this;
}
void Event::disarm() {
if (prev != nullptr) {
if (threadLocalEventLoop != &loop && threadLocalEventLoop != nullptr) {
KJ_LOG(FATAL, "Promise destroyed from a different thread than it was created in.");
// There's no way out of this place without UB, so abort now.
abort();
}
if (loop.tail == &next) {
loop.tail = prev;
}
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
if (loop.breadthFirstInsertPoint == &next) {
loop.breadthFirstInsertPoint = prev;
}
*prev = next;
if (next != nullptr) {
next->prev = prev;
}
prev = nullptr;
next = nullptr;
}
}
String Event::traceEvent() {
void* space[32];
TraceBuilder builder(space);
traceEvent(builder);
return kj::str(builder);
}
String TraceBuilder::toString() {
auto result = finish();
return kj::str(stringifyStackTraceAddresses(result),
stringifyStackTrace(result));
}
} // namespace _ (private)
ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space) {
EventLoop* loop = threadLocalEventLoop;
if (loop == nullptr) return nullptr;
if (loop->currentlyFiring == nullptr) return nullptr;
_::TraceBuilder builder(space);
loop->currentlyFiring->traceEvent(builder);
return builder.finish();
}
kj::String getAsyncTrace() {
void* space[32];
auto trace = getAsyncTrace(space);
return kj::str(stringifyStackTraceAddresses(trace), stringifyStackTrace(trace));
}
// =======================================================================================
namespace _ { // private
kj::String PromiseBase::trace() {
void* space[32];
TraceBuilder builder(space);
node->tracePromise(builder, false);
return kj::str(builder);
}
void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}
void PromiseNode::OnReadyEvent::init(Event* newEvent) {
if (event == _kJ_ALREADY_READY) {
// A new continuation was added to a promise that was already ready. In this case, we schedule
// breadth-first, to make it difficult for applications to accidentally starve the event loop
// by repeatedly waiting on immediate promises.
if (newEvent) newEvent->armBreadthFirst();
} else {
event = newEvent;
}
}
void PromiseNode::OnReadyEvent::arm() {
KJ_ASSERT(event != _kJ_ALREADY_READY, "arm() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it. In this case, arm in depth-first
// order so that the event runs immediately after the current one. This way, chained promises
// execute together for better cache locality and lower latency.
event->armDepthFirst();
}
event = _kJ_ALREADY_READY;
}
void PromiseNode::OnReadyEvent::armBreadthFirst() {
KJ_ASSERT(event != _kJ_ALREADY_READY, "armBreadthFirst() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it.
event->armBreadthFirst();
}
event = _kJ_ALREADY_READY;
}
// -------------------------------------------------------------------
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
void ImmediatePromiseNodeBase::onReady(Event* event) noexcept {
if (event) event->armBreadthFirst();
}
void ImmediatePromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// Maybe returning the address of get() will give us a function name with meaningful type
// information.
builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
}
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
: exception(kj::mv(exception)) {}
void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
output.exception = kj::mv(exception);
}
// -------------------------------------------------------------------
AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependencyParam)
: dependency(kj::mv(dependencyParam)) {
dependency->setSelfPointer(&dependency);
}
void AttachmentPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
dependency->get(output);
}
void AttachmentPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
dependency->tracePromise(builder, stopAtNextEvent);
// TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and
// addBranch()?
}
void AttachmentPromiseNodeBase::dropDependency() {
dependency = nullptr;
}
// -------------------------------------------------------------------
TransformPromiseNodeBase::TransformPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, void* continuationTracePtr)
: dependency(kj::mv(dependencyParam)), continuationTracePtr(continuationTracePtr) {
dependency->setSelfPointer(&dependency);
}
void TransformPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
getImpl(output);
dropDependency();
})) {
output.addException(kj::mv(*exception));
}
}
void TransformPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// Note that we null out the dependency just before calling our own continuation, which
// conveniently means that if we're currently executing the continuation when the trace is
// requested, it won't trace into the obsolete dependency. Nice.
if (dependency.get() != nullptr) {
dependency->tracePromise(builder, stopAtNextEvent);
}
builder.add(continuationTracePtr);
}
void TransformPromiseNodeBase::dropDependency() {
dependency = nullptr;
}
void TransformPromiseNodeBase::getDepResult(ExceptionOrValue& output) {
dependency->get(output);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
dependency = nullptr;
})) {
output.addException(kj::mv(*exception));
}
KJ_IF_MAYBE(e, output.exception) {
e->addTrace(continuationTracePtr);
}
}
// -------------------------------------------------------------------
ForkBranchBase::ForkBranchBase(Own<ForkHubBase>&& hubParam): hub(kj::mv(hubParam)) {
if (hub->tailBranch == nullptr) {
onReadyEvent.arm();
} else {
// Insert into hub's linked list of branches.
prevPtr = hub->tailBranch;
*prevPtr = this;
next = nullptr;
hub->tailBranch = &next;
}
}
ForkBranchBase::~ForkBranchBase() noexcept(false) {
if (prevPtr != nullptr) {
// Remove from hub's linked list of branches.
*prevPtr = next;
(next == nullptr ? hub->tailBranch : next->prevPtr) = prevPtr;
}
}
void ForkBranchBase::hubReady() noexcept {
onReadyEvent.arm();
}
void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
hub = nullptr;
})) {
output.addException(kj::mv(*exception));
}
}
void ForkBranchBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void ForkBranchBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
if (stopAtNextEvent) return;
if (hub.get() != nullptr) {
hub->inner->tracePromise(builder, false);
}
// TODO(debug): Maybe use __builtin_return_address to get the locations that called fork() and
// addBranch()?
}
// -------------------------------------------------------------------
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef,
SourceLocation location)
: Event(location), inner(kj::mv(innerParam)), resultRef(resultRef) {
inner->setSelfPointer(&inner);
inner->onReady(this);
}
Maybe<Own<Event>> ForkHubBase::fire() {
// Dependency is ready. Fetch its result and then delete the node.
inner->get(resultRef);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
inner = nullptr;
})) {
resultRef.addException(kj::mv(*exception));
}
for (auto branch = headBranch; branch != nullptr; branch = branch->next) {
branch->hubReady();
*branch->prevPtr = nullptr;
branch->prevPtr = nullptr;
}
*tailBranch = nullptr;
// Indicate that the list is no longer active.
tailBranch = nullptr;
return nullptr;
}
void ForkHubBase::traceEvent(TraceBuilder& builder) {
if (inner.get() != nullptr) {
inner->tracePromise(builder, true);
}
if (headBranch != nullptr) {
// We'll trace down the first branch, I guess.
headBranch->onReadyEvent.traceEvent(builder);
}
}
// -------------------------------------------------------------------
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam, SourceLocation location)
: Event(location), state(STEP1), inner(kj::mv(innerParam)) {
inner->setSelfPointer(&inner);
inner->onReady(this);
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
void ChainPromiseNode::onReady(Event* event) noexcept {
switch (state) {
case STEP1:
onReadyEvent = event;
return;
case STEP2:
inner->onReady(event);
return;
}
KJ_UNREACHABLE;
}
void ChainPromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {
if (state == STEP2) {
*selfPtr = kj::mv(inner); // deletes this!
selfPtr->get()->setSelfPointer(selfPtr);
} else {
this->selfPtr = selfPtr;
}
}
void ChainPromiseNode::get(ExceptionOrValue& output) noexcept {
KJ_REQUIRE(state == STEP2);
return inner->get(output);
}
void ChainPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
if (stopAtNextEvent && state == STEP1) {
// In STEP1, we are an Event -- when the inner node resolves, it will arm *this* object.
// In STEP2, we are not an Event -- when the inner node resolves, it directly arms our parent
// event.
return;
}
inner->tracePromise(builder, stopAtNextEvent);
}
Maybe<Own<Event>> ChainPromiseNode::fire() {
KJ_REQUIRE(state != STEP2);
static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
"This code assumes Promise<T> does not add any new members to PromiseBase.");
ExceptionOr<PromiseBase> intermediate;
inner->get(intermediate);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
inner = nullptr;
})) {
intermediate.addException(kj::mv(*exception));
}
KJ_IF_MAYBE(exception, intermediate.exception) {
// There is an exception. If there is also a value, delete it.
kj::runCatchingExceptions([&]() { intermediate.value = nullptr; });
// Now set step2 to a rejected promise.
inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception));
} else KJ_IF_MAYBE(value, intermediate.value) {
// There is a value and no exception. The value is itself a promise. Adopt it as our
// step2.
inner = _::PromiseNode::from(kj::mv(*value));
} else {
// We can only get here if inner->get() returned neither an exception nor a
// value, which never actually happens.
KJ_FAIL_ASSERT("Inner node returned empty value.");
}
state = STEP2;
if (selfPtr != nullptr) {
// Hey, we can shorten the chain here.
auto chain = selfPtr->downcast<ChainPromiseNode>();
*selfPtr = kj::mv(inner);
selfPtr->get()->setSelfPointer(selfPtr);
if (onReadyEvent != nullptr) {
selfPtr->get()->onReady(onReadyEvent);
}
// Return our self-pointer so that the caller takes care of deleting it.
return Own<Event>(kj::mv(chain));
} else {
inner->setSelfPointer(&inner);
if (onReadyEvent != nullptr) {
inner->onReady(onReadyEvent);
}
return nullptr;
}
}
void ChainPromiseNode::traceEvent(TraceBuilder& builder) {
switch (state) {
case STEP1:
if (inner.get() != nullptr) {
inner->tracePromise(builder, true);
}
if (!builder.full() && onReadyEvent != nullptr) {
onReadyEvent->traceEvent(builder);
}
break;
case STEP2:
// This probably never happens -- a trace being generated after the meat of fire() already
// executed. If it does, though, we probably can't do anything here. We don't know if
// `onReadyEvent` is still valid because we passed it on to the phase-2 promise, and tracing
// just `inner` would probably be confusing. Let's just do nothing.
break;
}
}
// -------------------------------------------------------------------
ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(
Own<PromiseNode> left, Own<PromiseNode> right, SourceLocation location)
: left(*this, kj::mv(left), location), right(*this, kj::mv(right), location) {}
ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
void ExclusiveJoinPromiseNode::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept {
KJ_REQUIRE(left.get(output) || right.get(output), "get() called before ready.");
}
void ExclusiveJoinPromiseNode::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// TODO(debug): Maybe use __builtin_return_address to get the locations that called
// exclusiveJoin()?
if (stopAtNextEvent) return;
// Trace the left branch I guess.
if (left.dependency.get() != nullptr) {
left.dependency->tracePromise(builder, false);
} else if (right.dependency.get() != nullptr) {
right.dependency->tracePromise(builder, false);
}
}
ExclusiveJoinPromiseNode::Branch::Branch(
ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam, SourceLocation location)
: Event(location), joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
dependency->setSelfPointer(&dependency);
dependency->onReady(this);
}
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) {
if (dependency) {
dependency->get(output);
return true;
} else {
return false;
}
}
Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() {
if (dependency) {
// Cancel the branch that didn't return first. Ignore exceptions caused by cancellation.
if (this == &joinNode.left) {
kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; });
} else {
kj::runCatchingExceptions([&]() { joinNode.left.dependency = nullptr; });
}
joinNode.onReadyEvent.arm();
} else {
// The other branch already fired, and this branch was canceled. It's possible for both
// branches to fire if both were armed simultaneously.
}
return nullptr;
}
void ExclusiveJoinPromiseNode::Branch::traceEvent(TraceBuilder& builder) {
if (dependency.get() != nullptr) {
dependency->tracePromise(builder, true);
}
joinNode.onReadyEvent.traceEvent(builder);
}
// -------------------------------------------------------------------
ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize,
SourceLocation location)
: countLeft(promises.size()) {
// Make the branches.
auto builder = heapArrayBuilder<Branch>(promises.size());
for (uint i: indices(promises)) {
ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
reinterpret_cast<byte*>(resultParts) + i * partSize);
builder.add(*this, kj::mv(promises[i]), output, location);
}
branches = builder.finish();
if (branches.size() == 0) {
onReadyEvent.arm();
}
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
void ArrayJoinPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
// If any of the elements threw exceptions, propagate them.
for (auto& branch: branches) {
KJ_IF_MAYBE(exception, branch.getPart()) {
output.addException(kj::mv(*exception));
}
}
if (output.exception == nullptr) {
// No errors. The template subclass will need to fill in the result.
getNoError(output);
}
}
void ArrayJoinPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// TODO(debug): Maybe use __builtin_return_address to get the locations that called
// joinPromises()?
if (stopAtNextEvent) return;
// Trace the first branch I guess.
if (branches != nullptr) {
branches[0].dependency->tracePromise(builder, false);
}
}
ArrayJoinPromiseNodeBase::Branch::Branch(
ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output,
SourceLocation location)
: Event(location), joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
dependency->setSelfPointer(&dependency);
dependency->onReady(this);
}
ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
if (--joinNode.countLeft == 0) {
joinNode.onReadyEvent.arm();
}
return nullptr;
}
void ArrayJoinPromiseNodeBase::Branch::traceEvent(TraceBuilder& builder) {
dependency->tracePromise(builder, true);
joinNode.onReadyEvent.traceEvent(builder);
}
Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
dependency->get(output);
return kj::mv(output.exception);
}
ArrayJoinPromiseNode<void>::ArrayJoinPromiseNode(
Array<Own<PromiseNode>> promises, Array<ExceptionOr<_::Void>> resultParts,
SourceLocation location)
: ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<_::Void>),
location),
resultParts(kj::mv(resultParts)) {}
ArrayJoinPromiseNode<void>::~ArrayJoinPromiseNode() {}
void ArrayJoinPromiseNode<void>::getNoError(ExceptionOrValue& output) noexcept {
output.as<_::Void>() = _::Void();
}
} // namespace _ (private)
Promise<void> joinPromises(Array<Promise<void>>&& promises, SourceLocation location) {
return _::PromiseNode::to<Promise<void>>(kj::heap<_::ArrayJoinPromiseNode<void>>(
KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
heapArray<_::ExceptionOr<_::Void>>(promises.size()), location));
}
namespace _ { // (private)
// -------------------------------------------------------------------
EagerPromiseNodeBase::EagerPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef, SourceLocation location)
: Event(location), dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
dependency->setSelfPointer(&dependency);
dependency->onReady(this);
}
void EagerPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void EagerPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// TODO(debug): Maybe use __builtin_return_address to get the locations that called
// eagerlyEvaluate()? But note that if a non-null exception handler was passed to it, that
// creates a TransformPromiseNode which will report the location anyhow.
if (stopAtNextEvent) return;
if (dependency.get() != nullptr) {
dependency->tracePromise(builder, stopAtNextEvent);
}
}
void EagerPromiseNodeBase::traceEvent(TraceBuilder& builder) {
if (dependency.get() != nullptr) {
dependency->tracePromise(builder, true);
}
onReadyEvent.traceEvent(builder);
}
Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
dependency->get(resultRef);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
dependency = nullptr;
})) {
resultRef.addException(kj::mv(*exception));
}
onReadyEvent.arm();
return nullptr;
}
// -------------------------------------------------------------------
void AdapterPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void AdapterPromiseNodeBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
// Maybe returning the address of get() will give us a function name with meaningful type
// information.
builder.add(getMethodStartAddress(implicitCast<PromiseNode&>(*this), &PromiseNode::get));
}
void END_FULFILLER_STACK_START_LISTENER_STACK() {}
// Dummy symbol used when reporting how a PromiseFulfiller was destroyed without fulfilling the
// promise. We end up combining two stack traces into one and we use this as a separator.
void WeakFulfillerBase::disposeImpl(void* pointer) const {
if (inner == nullptr) {
// Already detached.
delete this;
} else {
if (inner->isWaiting()) {
// Let's find out if there's an exception being thrown. If so, we'll use it to reject the
// promise.
inner->reject(getDestructionReason(
reinterpret_cast<void*>(&END_FULFILLER_STACK_START_LISTENER_STACK),
kj::Exception::Type::FAILED, __FILE__, __LINE__,
"PromiseFulfiller was destroyed without fulfilling the promise."_kj));
}
inner = nullptr;
}
}
} // namespace _ (private)
// -------------------------------------------------------------------
namespace _ { // (private)
Promise<void> IdentityFunc<Promise<void>>::operator()() const { return READY_NOW; }
} // namespace _ (private)
// -------------------------------------------------------------------
#if KJ_HAS_COROUTINE
namespace _ { // (private)
CoroutineBase::CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef,
SourceLocation location)
: Event(location),
coroutine(coroutine),
resultRef(resultRef) {}
CoroutineBase::~CoroutineBase() noexcept(false) {
readMaybe(maybeDisposalResults)->destructorRan = true;
}
void CoroutineBase::unhandled_exception() {
// Pretty self-explanatory, we propagate the exception to the promise which owns us, unless
// we're being destroyed, in which case we propagate it back to our disposer. Note that all
// unhandled exceptions end up here, not just ones after the first co_await.
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([] { throw; })) {
KJ_IF_MAYBE(disposalResults, maybeDisposalResults) {
// Exception during coroutine destruction. Only record the first one.
if (disposalResults->exception == nullptr) {
disposalResults->exception = kj::mv(*exception);
}
} else if (isWaiting()) {
// Exception during coroutine execution.
resultRef.addException(kj::mv(*exception));
scheduleResumption();
} else {
// Okay, what could this mean? We've already been fulfilled or rejected, but we aren't being
// destroyed yet. The only possibility is that we are unwinding the coroutine frame due to a
// successful completion, and something in the frame threw. We can't already be rejected,
// because rejecting a coroutine involves throwing, which would have unwound the frame prior
// to setting `waiting = false`.
//
// Since we know we're unwinding due to a successful completion, we also know that whatever
// Event we may have armed has not yet fired, because we haven't had a chance to return to
// the event loop.
// final_suspend() has not been called.
KJ_IASSERT(!coroutine.done());
// Since final_suspend() hasn't been called, whatever Event is waiting on us has not fired,
// and will see this exception.
resultRef.addException(kj::mv(*exception));
}
} else {
KJ_UNREACHABLE;
}
}
void CoroutineBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
void CoroutineBase::tracePromise(TraceBuilder& builder, bool stopAtNextEvent) {
if (stopAtNextEvent) return;
KJ_IF_MAYBE(promise, promiseNodeForTrace) {
promise->tracePromise(builder, stopAtNextEvent);
}
// Maybe returning the address of coroutine() will give us a function name with meaningful type
// information. (Narrator: It doesn't.)
builder.add(GetFunctorStartAddress<>::apply(coroutine));
};
Maybe<Own<Event>> CoroutineBase::fire() {
// Call Awaiter::await_resume() and proceed with the coroutine. Note that this will not destroy
// the coroutine if control flows off the end of it, because we return suspend_always() from
// final_suspend().
//
// It's tempting to arrange to check for exceptions right now and reject the promise that owns
// us without resuming the coroutine, which would save us from throwing an exception when we
// already know where it's going. But, we don't really know: unlike in the KJ_NO_EXCEPTIONS
// case, the `co_await` might be in a try-catch block, so we have no choice but to resume and
// throw later.
//
// TODO(someday): If we ever support coroutines with -fno-exceptions, we'll need to reject the
// enclosing coroutine promise here, if the Awaiter's result is exceptional.
promiseNodeForTrace = nullptr;
coroutine.resume();
return nullptr;
}
void CoroutineBase::traceEvent(TraceBuilder& builder) {
KJ_IF_MAYBE(promise, promiseNodeForTrace) {
promise->tracePromise(builder, true);
}
// Maybe returning the address of coroutine() will give us a function name with meaningful type
// information. (Narrator: It doesn't.)
builder.add(GetFunctorStartAddress<>::apply(coroutine));
onReadyEvent.traceEvent(builder);
}
void CoroutineBase::disposeImpl(void* pointer) const {
KJ_IASSERT(pointer == this);
// const_cast okay -- every Own<PromiseNode> that we build in get_return_object() uses itself
// as the disposer, thus every disposer is unique and there are no thread-safety concerns.
const_cast<CoroutineBase&>(*this).destroy();
}
void CoroutineBase::destroy() {
// Mutable helper function for disposeImpl(). Basically a wrapper around coroutine.destroy()
// with some stuff to propagate exceptions appropriately.
// Objects in the coroutine frame might throw from their destructors, so unhandled_exception()
// will need some way to communicate those exceptions back to us. Separately, we also want
// confirmation that our own ~Coroutine() destructor ran. To solve this, we put a
// DisposalResults object on the stack and set a pointer to it in the Coroutine object. This
// indicates to unhandled_exception() and ~Coroutine() where to store the results of the
// destruction operation.
DisposalResults disposalResults;
maybeDisposalResults = &disposalResults;
// Need to save this while `unwindDetector` is still valid.
bool shouldRethrow = !unwindDetector.isUnwinding();
do {
// Clang's implementation of the Coroutines TS does not destroy the Coroutine object or
// deallocate the coroutine frame if a destructor of an object on the frame threw an
// exception. This is despite the fact that it delivered the exception to _us_ via
// unhandled_exception(). Anyway, it appears we can work around this by running
// coroutine.destroy() a second time.
//
// On Clang, `disposalResults.exception != nullptr` implies `!disposalResults.destructorRan`.
// We could optimize out the separate `destructorRan` flag if we verify that other compilers
// behave the same way.
coroutine.destroy();
} while (!disposalResults.destructorRan);
// WARNING: `this` is now a dangling pointer.
KJ_IF_MAYBE(exception, disposalResults.exception) {
if (shouldRethrow) {
kj::throwFatalException(kj::mv(*exception));
} else {
// An exception is already unwinding the stack, so throwing this secondary exception would
// call std::terminate().
}
}
}
CoroutineBase::AwaiterBase::AwaiterBase(Own<PromiseNode> node): node(kj::mv(node)) {}
CoroutineBase::AwaiterBase::AwaiterBase(AwaiterBase&&) = default;
CoroutineBase::AwaiterBase::~AwaiterBase() noexcept(false) {
// Make sure it's safe to generate an async stack trace between now and when the Coroutine is
// destroyed.
KJ_IF_MAYBE(coroutineEvent, maybeCoroutineEvent) {
coroutineEvent->promiseNodeForTrace = nullptr;
}
unwindDetector.catchExceptionsIfUnwinding([this]() {
// No need to check for a moved-from state, node will just ignore the nullification.
node = nullptr;
});
}
void CoroutineBase::AwaiterBase::getImpl(ExceptionOrValue& result) {
node->get(result);
KJ_IF_MAYBE(exception, result.exception) {
kj::throwFatalException(kj::mv(*exception));
}
}
bool CoroutineBase::AwaiterBase::awaitSuspendImpl(CoroutineBase& coroutineEvent) {
node->setSelfPointer(&node);
node->onReady(&coroutineEvent);
if (coroutineEvent.hasSuspendedAtLeastOnce && coroutineEvent.isNext()) {
// The result is immediately ready and this coroutine is running on the event loop's stack, not
// a user code stack. Let's cancel our event and immediately resume. It's important that we
// don't perform this optimization if this is the first suspension, because our caller may
// depend on running code before this promise's continuations fire.
coroutineEvent.disarm();
// We can resume ourselves by returning false. This accomplishes the same thing as if we had
// returned true from await_ready().
return false;
} else {
// Otherwise, we must suspend. Store a reference to the promise we're waiting on for tracing
// purposes; coroutineEvent.fire() and/or ~Adapter() will null this out.
coroutineEvent.promiseNodeForTrace = *node;
maybeCoroutineEvent = coroutineEvent;
coroutineEvent.hasSuspendedAtLeastOnce = true;
return true;
}
}
} // namespace _ (private)
#endif // KJ_HAS_COROUTINE
} // namespace kj