blob: 09f6c5ffae1034a7d2a3234f706b2e360c3cddc7 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// This file contains extended inline implementation details that are required along with async.h.
// We move this all into a separate file to make async.h more readable.
//
// Non-inline declarations here are defined in async.c++.
#pragma once
#ifndef KJ_ASYNC_H_INCLUDED
#error "Do not include this directly; include kj/async.h."
#include "async.h" // help IDE parse this file
#endif
KJ_BEGIN_HEADER
#include "list.h"
namespace kj {
namespace _ { // private
template <typename T>
class ExceptionOr;
class ExceptionOrValue {
public:
ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
KJ_DISALLOW_COPY(ExceptionOrValue);
void addException(Exception&& exception) {
if (this->exception == nullptr) {
this->exception = kj::mv(exception);
}
}
template <typename T>
ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
template <typename T>
const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
Maybe<Exception> exception;
protected:
// Allow subclasses to have move constructor / assignment.
ExceptionOrValue() = default;
ExceptionOrValue(ExceptionOrValue&& other) = default;
ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
};
template <typename T>
class ExceptionOr: public ExceptionOrValue {
public:
ExceptionOr() = default;
ExceptionOr(T&& value): value(kj::mv(value)) {}
ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
ExceptionOr(ExceptionOr&&) = default;
ExceptionOr& operator=(ExceptionOr&&) = default;
Maybe<T> value;
};
template <typename T>
inline T convertToReturn(ExceptionOr<T>&& result) {
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
inline void convertToReturn(ExceptionOr<Void>&& result) {
// Override <void> case to use throwRecoverableException().
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
class TraceBuilder {
// Helper for methods that build a call trace.
public:
TraceBuilder(ArrayPtr<void*> space)
: start(space.begin()), current(space.begin()), limit(space.end()) {}
inline void add(void* addr) {
if (current < limit) {
*current++ = addr;
}
}
inline bool full() const { return current == limit; }
ArrayPtr<void*> finish() {
return arrayPtr(start, current);
}
String toString();
private:
void** start;
void** current;
void** limit;
};
class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this
// internally.
public:
Event(SourceLocation location);
Event(kj::EventLoop& loop, SourceLocation location);
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void armDepthFirst();
// Enqueue this event so that `fire()` will be called from the event loop soon.
//
// Events scheduled in this way are executed in depth-first order: if an event callback arms
// more events, those events are placed at the front of the queue (in the order in which they
// were armed), so that they run immediately after the first event's callback returns.
//
// Depth-first event scheduling is appropriate for events that represent simple continuations
// of a previous event that should be globbed together for performance. Depth-first scheduling
// can lead to starvation, so any long-running task must occasionally yield with
// `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
// breadth-first.)
//
// To use breadth-first scheduling instead, use `armBreadthFirst()`.
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
void armLast();
// Enqueues this event to happen after all other events have run to completion and there is
// really nothing left to do except wait for I/O.
bool isNext();
// True if the Event has been armed and is next in line to be fired. This can be used after
// calling PromiseNode::onReady(event) to determine if a promise being waited is immediately
// ready, in which case continuations may be optimistically run without returning to the event
// loop. Note that this optimization is only valid if we know that we would otherwise immediately
// return to the event loop without running more application code. So this turns out to be useful
// in fairly narrow circumstances, chiefly when a coroutine is about to suspend, but discovers it
// doesn't need to.
//
// Returns false if the event loop is not currently running. This ensures that promise
// continuations don't execute except under a call to .wait().
void disarm();
// If the event is armed but hasn't fired, cancel it. (Destroying the event does this
// implicitly.)
virtual void traceEvent(TraceBuilder& builder) = 0;
// Build a trace of the callers leading up to this event. `builder` will be populated with
// "return addresses" of the promise chain waiting on this event. The return addresses may
// actually the addresses of lambdas passed to .then(), but in any case, feeding them into
// addr2line should produce useful source code locations.
//
// `traceEvent()` may be called from an async signal handler while `fire()` is executing. It
// must not allocate nor take locks.
String traceEvent();
// Helper that builds a trace and stringifies it.
protected:
virtual Maybe<Own<Event>> fire() = 0;
// Fire the event. Possibly returns a pointer to itself, which will be discarded by the
// caller. This is the only way that an event can delete itself as a result of firing, as
// doing so from within fire() will throw an exception.
private:
friend class kj::EventLoop;
EventLoop& loop;
Event* next;
Event** prev;
bool firing = false;
static constexpr uint MAGIC_LIVE_VALUE = 0x1e366381u;
uint live = MAGIC_LIVE_VALUE;
SourceLocation location;
};
class PromiseNode {
// A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
//
// To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky
// use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
// so down-cast in the few places that really need to be templated. Luckily this is all
// internal implementation details.
public:
virtual void onReady(Event* event) noexcept = 0;
// Arms the given event when ready.
//
// May be called multiple times. If called again before the event was armed, the old event will
// never be armed, only the new one. If called again after the event was armed, the new event
// will be armed immediately. Can be called with nullptr to un-register the existing event.
virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept;
// Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
// this node until it is destroyed or setSelfPointer() is called again. ChainPromiseNode uses
// this to shorten redundant chains. The default implementation does nothing; only
// ChainPromiseNode should implement this.
virtual void get(ExceptionOrValue& output) noexcept = 0;
// Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
// Can only be called once, and only after the node is ready. Must be called directly from the
// event loop, with no application code on the stack.
virtual void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) = 0;
// Build a trace of this promise chain, showing what it is currently waiting on.
//
// Since traces are ordered callee-before-caller, PromiseNode::tracePromise() should typically
// recurse to its child first, then after the child returns, add itself to the trace.
//
// If `stopAtNextEvent` is true, then the trace should stop as soon as it hits a PromiseNode that
// also implements Event, and should not trace that node or its children. This is used in
// conjunction with Event::traceEvent(). The chain of Events is often more sparse than the chain
// of PromiseNodes, because a TransformPromiseNode (which implements .then()) is not itself an
// Event. TransformPromiseNode instead tells its child node to directly notify its *parent* node
// when it is ready, and then TransformPromiseNode applies the .then() transformation during the
// call to .get().
//
// So, when we trace the chain of Events backwards, we end up hoping over segments of
// TransformPromiseNodes (and other similar types). In order to get those added to the trace,
// each Event must call back down the PromiseNode chain in the opposite direction, using this
// method.
//
// `tracePromise()` may be called from an async signal handler while `get()` is executing. It
// must not allocate nor take locks.
template <typename T>
static Own<PromiseNode> from(T&& promise) {
// Given a Promise, extract the PromiseNode.
return kj::mv(promise.node);
}
template <typename T>
static PromiseNode& from(T& promise) {
// Given a Promise, extract the PromiseNode.
return *promise.node;
}
template <typename T>
static T to(Own<PromiseNode>&& node) {
// Construct a Promise from a PromiseNode. (T should be a Promise type.)
return T(false, kj::mv(node));
}
protected:
class OnReadyEvent {
// Helper class for implementing onReady().
public:
void init(Event* newEvent);
void arm();
void armBreadthFirst();
// Arms the event if init() has already been called and makes future calls to init()
// automatically arm the event.
inline void traceEvent(TraceBuilder& builder) {
if (event != nullptr && !builder.full()) event->traceEvent(builder);
}
private:
Event* event = nullptr;
};
};
// -------------------------------------------------------------------
template <typename T>
inline NeverDone::operator Promise<T>() const {
return PromiseNode::to<Promise<T>>(neverDone());
}
// -------------------------------------------------------------------
class ImmediatePromiseNodeBase: public PromiseNode {
public:
ImmediatePromiseNodeBase();
~ImmediatePromiseNodeBase() noexcept(false);
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
};
template <typename T>
class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
// A promise that has already been resolved to an immediate value or exception.
public:
ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
public:
ImmediateBrokenPromiseNode(Exception&& exception);
void get(ExceptionOrValue& output) noexcept override;
private:
Exception exception;
};
// -------------------------------------------------------------------
class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
Own<PromiseNode> dependency;
void dropDependency();
template <typename>
friend class AttachmentPromiseNode;
};
template <typename Attachment>
class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
// A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
// object) until the promise resolves.
public:
AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment)
: AttachmentPromiseNodeBase(kj::mv(dependency)),
attachment(kj::mv<Attachment>(attachment)) {}
~AttachmentPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the attachment because the
// dependency may be using the attachment.
dropDependency();
}
private:
Attachment attachment;
};
// -------------------------------------------------------------------
#if __GNUC__ >= 8 && !__clang__
// GCC 8's class-memaccess warning rightly does not like the memcpy()'s below, but there's no
// "legal" way for us to extract the content of a PTMF so too bad.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wclass-memaccess"
#if __GNUC__ >= 11
// GCC 11's array-bounds is similarly upset with us for digging into "private" implementation
// details. But the format is well-defined by the ABI which cannot change so please just let us
// do it kthx.
#pragma GCC diagnostic ignored "-Warray-bounds"
#endif
#endif
template <typename T, typename ReturnType, typename... ParamTypes>
void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
template <typename T, typename ReturnType, typename... ParamTypes>
void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
// Given an object and a pointer-to-method, return the start address of the method's code. The
// intent is that this address can be used in a trace; addr2line should map it to the start of
// the function's definition. For virtual methods, this does a vtable lookup on `obj` to determine
// the address of the specific implementation (otherwise, `obj` wouldn't be needed).
//
// Note that if the method is overloaded or is a template, you will need to explicitly specify
// the param and return types, otherwise the compiler won't know which overload / template
// specialization you are requesting.
class PtmfHelper {
// This class is a private helper for GetFunctorStartAddress and getMethodStartAddress(). The
// class represents the internal representation of a pointer-to-member-function.
template <typename... ParamTypes>
friend struct GetFunctorStartAddress;
template <typename T, typename ReturnType, typename... ParamTypes>
friend void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...));
template <typename T, typename ReturnType, typename... ParamTypes>
friend void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const);
#if __GNUG__
void* ptr;
ptrdiff_t adj;
// Layout of a pointer-to-member-function used by GCC and compatible compilers.
void* apply(const void* obj) {
#if defined(__arm__) || defined(__mips__) || defined(__aarch64__)
if (adj & 1) {
ptrdiff_t voff = (ptrdiff_t)ptr;
#else
ptrdiff_t voff = (ptrdiff_t)ptr;
if (voff & 1) {
voff &= ~1;
#endif
return *(void**)(*(char**)obj + voff);
} else {
return ptr;
}
}
#define BODY \
PtmfHelper result; \
static_assert(sizeof(p) == sizeof(result), "unknown ptmf layout"); \
memcpy(&result, &p, sizeof(result)); \
return result
#else // __GNUG__
void* apply(const void* obj) { return nullptr; }
// TODO(port): PTMF instruction address extraction
#define BODY return PtmfHelper{}
#endif // __GNUG__, else
template <typename R, typename C, typename... P, typename F>
static PtmfHelper from(F p) { BODY; }
// Create a PtmfHelper from some arbitrary pointer-to-member-function which is not
// overloaded nor a template. In this case the compiler is able to deduce the full function
// signature directly given the name since there is only one function with that name.
template <typename R, typename C, typename... P>
static PtmfHelper from(R (C::*p)(NoInfer<P>...)) { BODY; }
template <typename R, typename C, typename... P>
static PtmfHelper from(R (C::*p)(NoInfer<P>...) const) { BODY; }
// Create a PtmfHelper from some poniter-to-member-function which is a template. In this case
// the function must match exactly the containing type C, return type R, and parameter types P...
// GetFunctorStartAddress normally specifies exactly the correct C and R, but can only make a
// guess at P. Luckily, if the function parameters are template parameters then it's not
// necessary to be precise about P.
#undef BODY
};
#if __GNUC__ >= 8 && !__clang__
#pragma GCC diagnostic pop
#endif
template <typename T, typename ReturnType, typename... ParamTypes>
void* getMethodStartAddress(T& obj, ReturnType (T::*method)(ParamTypes...)) {
return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
}
template <typename T, typename ReturnType, typename... ParamTypes>
void* getMethodStartAddress(const T& obj, ReturnType (T::*method)(ParamTypes...) const) {
return PtmfHelper::from<ReturnType, T, ParamTypes...>(method).apply(&obj);
}
template <typename... ParamTypes>
struct GetFunctorStartAddress {
// Given a functor (any object defining operator()), return the start address of the function,
// suitable for passing to addr2line to obtain a source file/line for debugging purposes.
//
// This turns out to be incredibly hard to implement in the presence of overloaded or templated
// functors. Therefore, we impose these specific restrictions, specific to our use case:
// - Overloading is not allowed, but templating is. (Generally we only intend to support lambdas
// anyway.)
// - The template parameters to GetFunctorStartAddress specify a hint as to the expected
// parameter types. If the functor is templated, its parameters must match exactly these types.
// (If it's not templated, ParamTypes are ignored.)
template <typename Func>
static void* apply(Func&& func) {
typedef decltype(func(instance<ParamTypes>()...)) ReturnType;
return PtmfHelper::from<ReturnType, Decay<Func>, ParamTypes...>(
&Decay<Func>::operator()).apply(&func);
}
};
template <>
struct GetFunctorStartAddress<Void&&>: public GetFunctorStartAddress<> {};
// Hack for TransformPromiseNode use case: an input type of `Void` indicates that the function
// actually has no parameters.
class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr);
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
Own<PromiseNode> dependency;
void* continuationTracePtr;
void dropDependency();
void getDepResult(ExceptionOrValue& output);
virtual void getImpl(ExceptionOrValue& output) = 0;
template <typename, typename, typename, typename>
friend class TransformPromiseNode;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
class TransformPromiseNode final: public TransformPromiseNodeBase {
// A PromiseNode that transforms the result of another PromiseNode through an application-provided
// function (implements `then()`).
public:
TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler,
void* continuationTracePtr)
: TransformPromiseNodeBase(kj::mv(dependency), continuationTracePtr),
func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
~TransformPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the continuations because it
// is a common pattern for the continuations to hold ownership of objects that might be in-use
// by the dependency.
dropDependency();
}
private:
Func func;
ErrorFunc errorHandler;
void getImpl(ExceptionOrValue& output) override {
ExceptionOr<DepT> depResult;
getDepResult(depResult);
KJ_IF_MAYBE(depException, depResult.exception) {
output.as<T>() = handle(
MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
errorHandler, kj::mv(*depException)));
} else KJ_IF_MAYBE(depValue, depResult.value) {
output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
}
}
ExceptionOr<T> handle(T&& value) {
return kj::mv(value);
}
ExceptionOr<T> handle(PropagateException::Bottom&& value) {
return ExceptionOr<T>(false, value.asException());
}
};
// -------------------------------------------------------------------
class ForkHubBase;
class ForkBranchBase: public PromiseNode {
public:
ForkBranchBase(Own<ForkHubBase>&& hub);
~ForkBranchBase() noexcept(false);
void hubReady() noexcept;
// Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
protected:
inline ExceptionOrValue& getHubResultRef();
void releaseHub(ExceptionOrValue& output);
// Release the hub. If an exception is thrown, add it to `output`.
private:
OnReadyEvent onReadyEvent;
Own<ForkHubBase> hub;
ForkBranchBase* next = nullptr;
ForkBranchBase** prevPtr = nullptr;
friend class ForkHubBase;
};
template <typename T> T copyOrAddRef(T& t) { return t; }
template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
template <typename T> Maybe<Own<T>> copyOrAddRef(Maybe<Own<T>>& t) {
return t.map([](Own<T>& ptr) {
return ptr->addRef();
});
}
template <typename T>
class ForkBranch final: public ForkBranchBase {
// A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
// a const reference.
public:
ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
void get(ExceptionOrValue& output) noexcept override {
ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
KJ_IF_MAYBE(value, hubResult.value) {
output.as<T>().value = copyOrAddRef(*value);
} else {
output.as<T>().value = nullptr;
}
output.exception = hubResult.exception;
releaseHub(output);
}
};
template <typename T, size_t index>
class SplitBranch final: public ForkBranchBase {
// A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
// a const reference.
public:
SplitBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element;
void get(ExceptionOrValue& output) noexcept override {
ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
KJ_IF_MAYBE(value, hubResult.value) {
output.as<Element>().value = kj::mv(kj::get<index>(*value));
} else {
output.as<Element>().value = nullptr;
}
output.exception = hubResult.exception;
releaseHub(output);
}
};
// -------------------------------------------------------------------
class ForkHubBase: public Refcounted, protected Event {
public:
ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef, SourceLocation location);
inline ExceptionOrValue& getResultRef() { return resultRef; }
private:
Own<PromiseNode> inner;
ExceptionOrValue& resultRef;
ForkBranchBase* headBranch = nullptr;
ForkBranchBase** tailBranch = &headBranch;
// Tail becomes null once the inner promise is ready and all branches have been notified.
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
friend class ForkBranchBase;
};
template <typename T>
class ForkHub final: public ForkHubBase {
// A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces
// the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
// possible).
public:
ForkHub(Own<PromiseNode>&& inner, SourceLocation location)
: ForkHubBase(kj::mv(inner), result, location) {}
Promise<_::UnfixVoid<T>> addBranch() {
return _::PromiseNode::to<Promise<_::UnfixVoid<T>>>(kj::heap<ForkBranch<T>>(addRef(*this)));
}
_::SplitTuplePromise<T> split(SourceLocation location) {
return splitImpl(MakeIndexes<tupleSize<T>()>(), location);
}
private:
ExceptionOr<T> result;
template <size_t... indexes>
_::SplitTuplePromise<T> splitImpl(Indexes<indexes...>, SourceLocation location) {
return kj::tuple(addSplit<indexes>(location)...);
}
template <size_t index>
ReducePromises<typename SplitBranch<T, index>::Element> addSplit(SourceLocation location) {
return _::PromiseNode::to<ReducePromises<typename SplitBranch<T, index>::Element>>(
maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)),
implicitCast<typename SplitBranch<T, index>::Element*>(nullptr),
location));
}
};
inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
return hub->getResultRef();
}
// -------------------------------------------------------------------
class ChainPromiseNode final: public PromiseNode, public Event {
// Promise node which reduces Promise<Promise<T>> to Promise<T>.
//
// `Event` is only a public base class because otherwise we can't cast Own<ChainPromiseNode> to
// Own<Event>. Ugh, templates and private...
public:
explicit ChainPromiseNode(Own<PromiseNode> inner, SourceLocation location);
~ChainPromiseNode() noexcept(false);
void onReady(Event* event) noexcept override;
void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
enum State {
STEP1,
STEP2
};
State state;
Own<PromiseNode> inner;
// In STEP1, a PromiseNode for a Promise<T>.
// In STEP2, a PromiseNode for a T.
Event* onReadyEvent = nullptr;
Own<PromiseNode>* selfPtr = nullptr;
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
};
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*, SourceLocation location) {
return heap<ChainPromiseNode>(kj::mv(node), location);
}
template <typename T>
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*, SourceLocation location) {
return kj::mv(node);
}
template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))>
inline Result maybeReduce(Promise<T>&& promise, bool) {
return T::reducePromise(kj::mv(promise));
}
template <typename T>
inline Promise<T> maybeReduce(Promise<T>&& promise, ...) {
return kj::mv(promise);
}
// -------------------------------------------------------------------
class ExclusiveJoinPromiseNode final: public PromiseNode {
public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right, SourceLocation location);
~ExclusiveJoinPromiseNode() noexcept(false);
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
class Branch: public Event {
public:
Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency,
SourceLocation location);
~Branch() noexcept(false);
bool get(ExceptionOrValue& output);
// Returns true if this is the side that finished.
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
private:
ExclusiveJoinPromiseNode& joinNode;
Own<PromiseNode> dependency;
friend class ExclusiveJoinPromiseNode;
};
Branch left;
Branch right;
OnReadyEvent onReadyEvent;
};
// -------------------------------------------------------------------
class ArrayJoinPromiseNodeBase: public PromiseNode {
public:
ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises,
ExceptionOrValue* resultParts, size_t partSize,
SourceLocation location);
~ArrayJoinPromiseNodeBase() noexcept(false);
void onReady(Event* event) noexcept override final;
void get(ExceptionOrValue& output) noexcept override final;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override final;
protected:
virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
// Called to compile the result only in the case where there were no errors.
private:
uint countLeft;
OnReadyEvent onReadyEvent;
class Branch final: public Event {
public:
Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency,
ExceptionOrValue& output, SourceLocation location);
~Branch() noexcept(false);
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
Maybe<Exception> getPart();
// Calls dependency->get(output). If there was an exception, return it.
private:
ArrayJoinPromiseNodeBase& joinNode;
Own<PromiseNode> dependency;
ExceptionOrValue& output;
friend class ArrayJoinPromiseNodeBase;
};
Array<Branch> branches;
};
template <typename T>
class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
public:
ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
Array<ExceptionOr<T>> resultParts,
SourceLocation location)
: ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>),
location),
resultParts(kj::mv(resultParts)) {}
protected:
void getNoError(ExceptionOrValue& output) noexcept override {
auto builder = heapArrayBuilder<T>(resultParts.size());
for (auto& part: resultParts) {
KJ_IASSERT(part.value != nullptr,
"Bug in KJ promise framework: Promise result had neither value no exception.");
builder.add(kj::mv(*_::readMaybe(part.value)));
}
output.as<Array<T>>() = builder.finish();
}
private:
Array<ExceptionOr<T>> resultParts;
};
template <>
class ArrayJoinPromiseNode<void> final: public ArrayJoinPromiseNodeBase {
public:
ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
Array<ExceptionOr<_::Void>> resultParts,
SourceLocation location);
~ArrayJoinPromiseNode();
protected:
void getNoError(ExceptionOrValue& output) noexcept override;
private:
Array<ExceptionOr<_::Void>> resultParts;
};
// -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected Event {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
// evaluate it.
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef,
SourceLocation location);
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
Own<PromiseNode> dependency;
OnReadyEvent onReadyEvent;
ExceptionOrValue& resultRef;
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
};
template <typename T>
class EagerPromiseNode final: public EagerPromiseNodeBase {
public:
EagerPromiseNode(Own<PromiseNode>&& dependency, SourceLocation location)
: EagerPromiseNodeBase(kj::mv(dependency), result, location) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
template <typename T>
Own<PromiseNode> spark(Own<PromiseNode>&& node, SourceLocation location) {
// Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
// on it.
return heap<EagerPromiseNode<T>>(kj::mv(node), location);
}
// -------------------------------------------------------------------
class AdapterPromiseNodeBase: public PromiseNode {
public:
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
protected:
inline void setReady() {
onReadyEvent.arm();
}
private:
OnReadyEvent onReadyEvent;
};
template <typename T, typename Adapter>
class AdapterPromiseNode final: public AdapterPromiseNodeBase,
private PromiseFulfiller<UnfixVoid<T>> {
// A PromiseNode that wraps a PromiseAdapter.
public:
template <typename... Params>
AdapterPromiseNode(Params&&... params)
: adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
void get(ExceptionOrValue& output) noexcept override {
KJ_IREQUIRE(!isWaiting());
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
bool waiting = true;
Adapter adapter;
void fulfill(T&& value) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(kj::mv(value));
setReady();
}
}
void reject(Exception&& exception) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(false, kj::mv(exception));
setReady();
}
}
bool isWaiting() override {
return waiting;
}
};
// -------------------------------------------------------------------
class FiberBase: public PromiseNode, private Event {
// Base class for the outer PromiseNode representing a fiber.
public:
explicit FiberBase(size_t stackSize, _::ExceptionOrValue& result, SourceLocation location);
explicit FiberBase(const FiberPool& pool, _::ExceptionOrValue& result, SourceLocation location);
~FiberBase() noexcept(false);
void start() { armDepthFirst(); }
// Call immediately after construction to begin executing the fiber.
class WaitDoneEvent;
void onReady(_::Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
protected:
bool isFinished() { return state == FINISHED; }
void destroy();
private:
enum { WAITING, RUNNING, CANCELED, FINISHED } state;
_::PromiseNode* currentInner = nullptr;
OnReadyEvent onReadyEvent;
Own<FiberStack> stack;
_::ExceptionOrValue& result;
void run();
virtual void runImpl(WaitScope& waitScope) = 0;
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
// Implements Event. Each time the event is fired, switchToFiber() is called.
friend class FiberStack;
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope, SourceLocation location);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
};
template <typename Func>
class Fiber final: public FiberBase {
public:
explicit Fiber(size_t stackSize, Func&& func, SourceLocation location)
: FiberBase(stackSize, result, location), func(kj::fwd<Func>(func)) {}
explicit Fiber(const FiberPool& pool, Func&& func, SourceLocation location)
: FiberBase(pool, result, location), func(kj::fwd<Func>(func)) {}
~Fiber() noexcept(false) { destroy(); }
typedef FixVoid<decltype(kj::instance<Func&>()(kj::instance<WaitScope&>()))> ResultType;
void get(ExceptionOrValue& output) noexcept override {
KJ_IREQUIRE(isFinished());
output.as<ResultType>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultType> result;
void runImpl(WaitScope& waitScope) override {
result.template as<ResultType>() =
MaybeVoidCaller<WaitScope&, ResultType>::apply(func, waitScope);
}
};
} // namespace _ (private)
// =======================================================================================
template <typename T>
Promise<T>::Promise(_::FixVoid<T> value)
: PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
template <typename T>
Promise<T>::Promise(kj::Exception&& exception)
: PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
template <typename T>
template <typename Func, typename ErrorFunc>
PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler,
SourceLocation location) {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
void* continuationTracePtr = _::GetFunctorStartAddress<_::FixVoid<T>&&>::apply(func);
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
continuationTracePtr);
auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
_::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
return _::maybeReduce(kj::mv(result), false);
}
namespace _ { // private
template <typename T>
struct IdentityFunc {
inline T operator()(T&& value) const {
return kj::mv(value);
}
};
template <typename T>
struct IdentityFunc<Promise<T>> {
inline Promise<T> operator()(T&& value) const {
return kj::mv(value);
}
};
template <>
struct IdentityFunc<void> {
inline void operator()() const {}
};
template <>
struct IdentityFunc<Promise<void>> {
Promise<void> operator()() const;
// This can't be inline because it will make the translation unit depend on kj-async. Awkwardly,
// Cap'n Proto relies on being able to include this header without creating such a link-time
// dependency.
};
} // namespace _ (private)
template <typename T>
template <typename ErrorFunc>
Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler, SourceLocation location) {
// then()'s ErrorFunc can only return a Promise if Func also returns a Promise. In this case,
// Func is being filled in automatically. We want to make sure ErrorFunc can return a Promise,
// but we don't want the extra overhead of promise chaining if ErrorFunc doesn't actually
// return a promise. So we make our Func return match ErrorFunc.
typedef _::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))> Func;
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
// The reason catch_() isn't simply implemented in terms of then() is because we want the trace
// pointer to be based on ErrorFunc rather than Func.
void* continuationTracePtr = _::GetFunctorStartAddress<kj::Exception&&>::apply(errorHandler);
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
kj::mv(node), Func(), kj::fwd<ErrorFunc>(errorHandler), continuationTracePtr);
auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, T>>>(
_::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
return _::maybeReduce(kj::mv(result), false);
}
template <typename T>
T Promise<T>::wait(WaitScope& waitScope, SourceLocation location) {
_::ExceptionOr<_::FixVoid<T>> result;
_::waitImpl(kj::mv(node), result, waitScope, location);
return convertToReturn(kj::mv(result));
}
template <typename T>
bool Promise<T>::poll(WaitScope& waitScope, SourceLocation location) {
return _::pollImpl(*node, waitScope, location);
}
template <typename T>
ForkedPromise<T> Promise<T>::fork(SourceLocation location) {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node), location));
}
template <typename T>
Promise<T> ForkedPromise<T>::addBranch() {
return hub->addBranch();
}
template <typename T>
bool ForkedPromise<T>::hasBranches() {
return hub->isShared();
}
template <typename T>
_::SplitTuplePromise<T> Promise<T>::split(SourceLocation location) {
return refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node), location)->split(location);
}
template <typename T>
Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other, SourceLocation location) {
return Promise(false, heap<_::ExclusiveJoinPromiseNode>(
kj::mv(node), kj::mv(other.node), location));
}
template <typename T>
template <typename... Attachments>
Promise<T> Promise<T>::attach(Attachments&&... attachments) {
return Promise(false, kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...)));
}
template <typename T>
template <typename ErrorFunc>
Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location) {
// See catch_() for commentary.
return Promise(false, _::spark<_::FixVoid<T>>(then(
_::IdentityFunc<decltype(errorHandler(instance<Exception&&>()))>(),
kj::fwd<ErrorFunc>(errorHandler)).node, location));
}
template <typename T>
Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr), SourceLocation location) {
return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node), location));
}
template <typename T>
kj::String Promise<T>::trace() {
return PromiseBase::trace();
}
template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) {
return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
}
template <typename Func>
inline PromiseForResult<Func, void> evalLast(Func&& func) {
return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
}
template <typename Func>
inline PromiseForResult<Func, void> evalNow(Func&& func) {
PromiseForResult<Func, void> result = nullptr;
KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() {
result = func();
})) {
result = kj::mv(*e);
}
return result;
}
template <typename Func>
struct RetryOnDisconnect_ {
static inline PromiseForResult<Func, void> apply(Func&& func) {
return evalLater([func = kj::mv(func)]() mutable -> PromiseForResult<Func, void> {
auto promise = evalNow(func);
return promise.catch_([func = kj::mv(func)](kj::Exception&& e) mutable -> PromiseForResult<Func, void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return func();
} else {
return kj::mv(e);
}
});
});
}
};
template <typename Func>
struct RetryOnDisconnect_<Func&> {
// Specialization for references. Needed because the syntax for capturing references in a
// lambda is different. :(
static inline PromiseForResult<Func, void> apply(Func& func) {
auto promise = evalLater(func);
return promise.catch_([&func](kj::Exception&& e) -> PromiseForResult<Func, void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return func();
} else {
return kj::mv(e);
}
});
}
};
template <typename Func>
inline PromiseForResult<Func, void> retryOnDisconnect(Func&& func) {
return RetryOnDisconnect_<Func>::apply(kj::fwd<Func>(func));
}
template <typename Func>
inline PromiseForResult<Func, WaitScope&> startFiber(
size_t stackSize, Func&& func, SourceLocation location) {
typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(
stackSize, kj::fwd<Func>(func), location);
intermediate->start();
auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
_::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
return _::maybeReduce(kj::mv(result), false);
}
template <typename Func>
inline PromiseForResult<Func, WaitScope&> FiberPool::startFiber(
Func&& func, SourceLocation location) const {
typedef _::FixVoid<_::ReturnType<Func, WaitScope&>> ResultT;
Own<_::FiberBase> intermediate = kj::heap<_::Fiber<Func>>(*this, kj::fwd<Func>(func), location);
intermediate->start();
auto result = _::PromiseNode::to<_::ChainPromises<_::ReturnType<Func, WaitScope&>>>(
_::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr), location));
return _::maybeReduce(kj::mv(result), false);
}
template <typename T>
template <typename ErrorFunc>
void Promise<T>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
}
template <>
template <typename ErrorFunc>
void Promise<void>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
}
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location) {
return _::PromiseNode::to<Promise<Array<T>>>(kj::heap<_::ArrayJoinPromiseNode<T>>(
KJ_MAP(p, promises) { return _::PromiseNode::from(kj::mv(p)); },
heapArray<_::ExceptionOr<T>>(promises.size()), location));
}
// =======================================================================================
namespace _ { // private
class WeakFulfillerBase: protected kj::Disposer {
protected:
WeakFulfillerBase(): inner(nullptr) {}
virtual ~WeakFulfillerBase() noexcept(false) {}
template <typename T>
inline PromiseFulfiller<T>* getInner() {
return static_cast<PromiseFulfiller<T>*>(inner);
};
template <typename T>
inline void setInner(PromiseFulfiller<T>* ptr) {
inner = ptr;
};
private:
mutable PromiseRejector* inner;
void disposeImpl(void* pointer) const override;
};
template <typename T>
class WeakFulfiller final: public PromiseFulfiller<T>, public WeakFulfillerBase {
// A wrapper around PromiseFulfiller which can be detached.
//
// There are a couple non-trivialities here:
// - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
// rejected.
// - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
// detached from the underlying fulfiller, because otherwise the later detach() call will go
// to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
// refcount never goes over 2 and we manually implement the refcounting because we need to do
// other special things when each side detaches anyway. To this end, WeakFulfiller is its own
// Disposer -- dispose() is called when the application discards its owned pointer to the
// fulfiller and detach() is called when the promise is destroyed.
public:
KJ_DISALLOW_COPY(WeakFulfiller);
static kj::Own<WeakFulfiller> make() {
WeakFulfiller* ptr = new WeakFulfiller;
return Own<WeakFulfiller>(ptr, *ptr);
}
void fulfill(FixVoid<T>&& value) override {
if (getInner<T>() != nullptr) {
getInner<T>()->fulfill(kj::mv(value));
}
}
void reject(Exception&& exception) override {
if (getInner<T>() != nullptr) {
getInner<T>()->reject(kj::mv(exception));
}
}
bool isWaiting() override {
return getInner<T>() != nullptr && getInner<T>()->isWaiting();
}
void attach(PromiseFulfiller<T>& newInner) {
setInner<T>(&newInner);
}
void detach(PromiseFulfiller<T>& from) {
if (getInner<T>() == nullptr) {
// Already disposed.
delete this;
} else {
KJ_IREQUIRE(getInner<T>() == &from);
setInner<T>(nullptr);
}
}
private:
WeakFulfiller() {}
};
template <typename T>
class PromiseAndFulfillerAdapter {
public:
PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
WeakFulfiller<T>& wrapper)
: fulfiller(fulfiller), wrapper(wrapper) {
wrapper.attach(fulfiller);
}
~PromiseAndFulfillerAdapter() noexcept(false) {
wrapper.detach(fulfiller);
}
private:
PromiseFulfiller<T>& fulfiller;
WeakFulfiller<T>& wrapper;
};
} // namespace _ (private)
template <typename T>
template <typename Func>
bool PromiseFulfiller<T>::rejectIfThrows(Func&& func) {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
reject(kj::mv(*exception));
return false;
} else {
return true;
}
}
template <typename Func>
bool PromiseFulfiller<void>::rejectIfThrows(Func&& func) {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions(kj::mv(func))) {
reject(kj::mv(*exception));
return false;
} else {
return true;
}
}
template <typename T, typename Adapter, typename... Params>
_::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
kj::fwd<Params>(adapterConstructorParams)...));
// We can't capture SourceLocation in this function's arguments since it is a vararg template. :(
return _::PromiseNode::to<_::ReducePromises<T>>(
_::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), SourceLocation()));
}
template <typename T>
PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location) {
auto wrapper = _::WeakFulfiller<T>::make();
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
auto promise = _::PromiseNode::to<_::ReducePromises<T>>(
_::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr), location));
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
}
// =======================================================================================
// cross-thread stuff
namespace _ { // (private)
class XThreadEvent: private Event, // it's an event in the target thread
public PromiseNode { // it's a PromiseNode in the requesting thread
public:
XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor, void* funcTracePtr,
SourceLocation location);
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
protected:
void ensureDoneOrCanceled();
// MUST be called in destructor of subclasses to make sure the object is not destroyed while
// still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
// that destructor doesn't run until the subclass has already been destroyed.)
virtual kj::Maybe<Own<PromiseNode>> execute() = 0;
// Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
// returns null.
// implements PromiseNode ----------------------------------------------------
void onReady(Event* event) noexcept override;
private:
ExceptionOrValue& result;
void* funcTracePtr;
kj::Own<const Executor> targetExecutor;
Maybe<const Executor&> replyExecutor; // If executeAsync() was used.
kj::Maybe<Own<PromiseNode>> promiseNode;
// Accessed only in target thread.
ListLink<XThreadEvent> targetLink;
// Membership in one of the linked lists in the target Executor's work list or cancel list. These
// fields are protected by the target Executor's mutex.
enum {
UNUSED,
// Object was never queued on another thread.
QUEUED,
// Target thread has not yet dequeued the event from the state.start list. The requesting
// thread can cancel execution by removing the event from the list.
EXECUTING,
// Target thread has dequeued the event from state.start and moved it to state.executing. To
// cancel, the requesting thread must add the event to the state.cancel list and change the
// state to CANCELING.
CANCELING,
// Requesting thread is trying to cancel this event. The target thread will change the state to
// `DONE` once canceled.
DONE
// Target thread has completed handling this event and will not touch it again. The requesting
// thread can safely delete the object. The `state` is updated to `DONE` using an atomic
// release operation after ensuring that the event will not be touched again, so that the
// requesting can safely skip locking if it observes the state is already DONE.
} state = UNUSED;
// State, which is also protected by `targetExecutor`'s mutex.
ListLink<XThreadEvent> replyLink;
// Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
// executing thread places the event in the reply list near the end of the `EXECUTING` state.
// Because the thread cannot lock two mutexes at once, it's possible that the reply executor
// will receive the reply while the event is still listed in the EXECUTING state, but it can
// ignore the state and proceed with the result.
OnReadyEvent onReadyEvent;
// Accessed only in requesting thread.
friend class kj::Executor;
void done();
// Sets the state to `DONE` and notifies the originating thread that this event is done. Do NOT
// call under lock.
void sendReply();
// Notifies the originating thread that this event is done, but doesn't set the state to DONE
// yet. Do NOT call under lock.
void setDoneState();
// Assigns `state` to `DONE`, being careful to use an atomic-release-store if needed. This must
// only be called in the destination thread, and must either be called under lock, or the thread
// must take the lock and release it again shortly after setting the state (because some threads
// may be waiting on the DONE state using a conditional wait on the mutex). After calling
// setDoneState(), the destination thread MUST NOT touch this object ever again; it now belongs
// solely to the requesting thread.
void setDisconnected();
// Sets the result to a DISCONNECTED exception indicating that the target event loop exited.
class DelayedDoneHack;
// implements Event ----------------------------------------------------------
Maybe<Own<Event>> fire() override;
// If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
// then it just indicated readiness and we need to get its result.
void traceEvent(TraceBuilder& builder) override;
};
template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
class XThreadEventImpl final: public XThreadEvent {
// Implementation for a function that does not return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target, SourceLocation location)
: XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func), location),
func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
return nullptr;
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
template <typename Func, typename T>
class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
// Implementation for a function that DOES return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target, SourceLocation location)
: XThreadEvent(result, target, GetFunctorStartAddress<>::apply(func), location),
func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
auto result = _::PromiseNode::from(func());
KJ_IREQUIRE(result.get() != nullptr);
return kj::mv(result);
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
} // namespace _ (private)
template <typename Func>
_::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(
Func&& func, SourceLocation location) const {
_::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this, location);
send(event, true);
return convertToReturn(kj::mv(event.result));
}
template <typename Func>
PromiseForResult<Func, void> Executor::executeAsync(Func&& func, SourceLocation location) const {
auto event = kj::heap<_::XThreadEventImpl<Func>>(kj::fwd<Func>(func), *this, location);
send(*event, false);
return _::PromiseNode::to<PromiseForResult<Func, void>>(kj::mv(event));
}
// -----------------------------------------------------------------------------
namespace _ { // (private)
template <typename T>
class XThreadFulfiller;
class XThreadPaf: public PromiseNode {
public:
XThreadPaf();
virtual ~XThreadPaf() noexcept(false);
class Disposer: public kj::Disposer {
public:
void disposeImpl(void* pointer) const override;
};
static const Disposer DISPOSER;
// implements PromiseNode ----------------------------------------------------
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
private:
enum {
WAITING,
// Not yet fulfilled, and the waiter is still waiting.
//
// Starting from this state, the state may transition to either FULFILLING or CANCELED
// using an atomic compare-and-swap.
FULFILLING,
// The fulfiller thread atomically transitions the state from WAITING to FULFILLING when it
// wishes to fulfill the promise. By doing so, it guarantees that the `executor` will not
// disappear out from under it. It then fills in the result value, locks the executor mutex,
// adds the object to the executor's list of fulfilled XThreadPafs, changes the state to
// FULFILLED, and finally unlocks the mutex.
//
// If the waiting thread tries to cancel but discovers the object in this state, then it
// must perform a conditional wait on the executor mutex to await the state becoming FULFILLED.
// It can then delete the object.
FULFILLED,
// The fulfilling thread has completed filling in the result value and inserting the object
// into the waiting thread's executor event queue. Moreover, the fulfilling thread no longer
// holds any pointers to this object. The waiting thread is responsible for deleting it.
DISPATCHED,
// The object reached FULFILLED state, and then was dispatched from the waiting thread's
// executor's event queue. Therefore, the object is completely owned by the waiting thread with
// no need to lock anything.
CANCELED
// The waiting thread atomically transitions the state from WAITING to CANCELED if it is no
// longer listening. In this state, it is the fulfiller thread's responsibility to destroy the
// object.
} state;
const Executor& executor;
// Executor of the waiting thread. Only guaranteed to be valid when state is `WAITING` or
// `FULFILLING`. After any other state has been reached, this reference may be invalidated.
ListLink<XThreadPaf> link;
// In the FULFILLING/FULFILLED states, the object is placed in a linked list within the waiting
// thread's executor. In those states, these pointers are guarded by said executor's mutex.
OnReadyEvent onReadyEvent;
class FulfillScope;
static kj::Exception unfulfilledException();
// Construct appropriate exception to use to reject an unfulfilled XThreadPaf.
template <typename T>
friend class XThreadFulfiller;
friend Executor;
};
template <typename T>
class XThreadPafImpl final: public XThreadPaf {
public:
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<FixVoid<T>>() = kj::mv(result);
}
private:
ExceptionOr<FixVoid<T>> result;
friend class XThreadFulfiller<T>;
};
class XThreadPaf::FulfillScope {
// Create on stack while setting `XThreadPafImpl<T>::result`.
//
// This ensures that:
// - Only one call is carried out, even if multiple threads try to fulfill concurrently.
// - The waiting thread is correctly signaled.
public:
FulfillScope(XThreadPaf** pointer);
// Atomically nulls out *pointer and takes ownership of the pointer.
~FulfillScope() noexcept(false);
KJ_DISALLOW_COPY(FulfillScope);
bool shouldFulfill() { return obj != nullptr; }
template <typename T>
XThreadPafImpl<T>* getTarget() { return static_cast<XThreadPafImpl<T>*>(obj); }
private:
XThreadPaf* obj;
};
template <typename T>
class XThreadFulfiller final: public CrossThreadPromiseFulfiller<T> {
public:
XThreadFulfiller(XThreadPafImpl<T>* target): target(target) {}
~XThreadFulfiller() noexcept(false) {
if (target != nullptr) {
reject(XThreadPaf::unfulfilledException());
}
}
void fulfill(FixVoid<T>&& value) const override {
XThreadPaf::FulfillScope scope(&target);
if (scope.shouldFulfill()) {
scope.getTarget<T>()->result = kj::mv(value);
}
}
void reject(Exception&& exception) const override {
XThreadPaf::FulfillScope scope(&target);
if (scope.shouldFulfill()) {
scope.getTarget<T>()->result.addException(kj::mv(exception));
}
}
bool isWaiting() const override {
KJ_IF_MAYBE(t, target) {
#if _MSC_VER && !__clang__
// Just assume 1-byte loads are atomic... on what kind of absurd platform would they not be?
return t->state == XThreadPaf::WAITING;
#else
return __atomic_load_n(&t->state, __ATOMIC_RELAXED) == XThreadPaf::WAITING;
#endif
} else {
return false;
}
}
private:
mutable XThreadPaf* target; // accessed using atomic ops
};
template <typename T>
class XThreadFulfiller<kj::Promise<T>> {
public:
static_assert(sizeof(T) < 0,
"newCrosssThreadPromiseAndFulfiller<Promise<T>>() is not currently supported");
// TODO(someday): Is this worth supporting? Presumably, when someone calls `fulfill(somePromise)`,
// then `somePromise` should be assumed to be a promise owned by the fulfilling thread, not
// the waiting thread.
};
} // namespace _ (private)
template <typename T>
PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller() {
kj::Own<_::XThreadPafImpl<T>> node(new _::XThreadPafImpl<T>, _::XThreadPaf::DISPOSER);
auto fulfiller = kj::heap<_::XThreadFulfiller<T>>(node);
return { _::PromiseNode::to<_::ReducePromises<T>>(kj::mv(node)), kj::mv(fulfiller) };
}
} // namespace kj
#if KJ_HAS_COROUTINE
// =======================================================================================
// Coroutines TS integration with kj::Promise<T>.
//
// Here's a simple coroutine:
//
// Promise<Own<AsyncIoStream>> connectToService(Network& n) {
// auto a = co_await n.parseAddress(IP, PORT);
// auto c = co_await a->connect();
// co_return kj::mv(c);
// }
//
// The presence of the co_await and co_return keywords tell the compiler it is a coroutine.
// Although it looks similar to a function, it has a couple large differences. First, everything
// that would normally live in the stack frame lives instead in a heap-based coroutine frame.
// Second, the coroutine has the ability to return from its scope without deallocating this frame
// (to suspend, in other words), and the ability to resume from its last suspension point.
//
// In order to know how to suspend, resume, and return from a coroutine, the compiler looks up a
// coroutine implementation type via a traits class parameterized by the coroutine return and
// parameter types. We'll name our coroutine implementation `kj::_::Coroutine<T>`,
namespace kj::_ { template <typename T> class Coroutine; }
// Specializing the appropriate traits class tells the compiler about `kj::_::Coroutine<T>`.
namespace KJ_COROUTINE_STD_NAMESPACE {
template <class T, class... Args>
struct coroutine_traits<kj::Promise<T>, Args...> {
// `Args...` are the coroutine's parameter types.
using promise_type = kj::_::Coroutine<T>;
// The Coroutines TS calls this the "promise type". This makes sense when thinking of coroutines
// returning `std::future<T>`, since the coroutine implementation would be a wrapper around
// a `std::promise<T>`. It's extremely confusing from a KJ perspective, however, so I call it
// the "coroutine implementation type" instead.
};
} // namespace KJ_COROUTINE_STD_NAMESPACE
// Now when the compiler sees our `connectToService()` coroutine above, it default-constructs a
// `coroutine_traits<Promise<Own<AsyncIoStream>>, Network&>::promise_type`, or
// `kj::_::Coroutine<Own<AsyncIoStream>>`.
//
// The implementation object lives in the heap-allocated coroutine frame. It gets destroyed and
// deallocated when the frame does.
namespace kj::_ {
namespace stdcoro = KJ_COROUTINE_STD_NAMESPACE;
class CoroutineBase: public PromiseNode,
public Event,
public Disposer {
public:
CoroutineBase(stdcoro::coroutine_handle<> coroutine, ExceptionOrValue& resultRef,
SourceLocation location);
~CoroutineBase() noexcept(false);
KJ_DISALLOW_COPY(CoroutineBase);
auto initial_suspend() { return stdcoro::suspend_never(); }
auto final_suspend() noexcept { return stdcoro::suspend_always(); }
// These adjust the suspension behavior of coroutines immediately upon initiation, and immediately
// after completion.
//
// The initial suspension point could allow us to defer the initial synchronous execution of a
// coroutine -- everything before its first co_await, that is.
//
// The final suspension point is useful to delay deallocation of the coroutine frame to match the
// lifetime of the enclosing promise.
void unhandled_exception();
protected:
class AwaiterBase;
bool isWaiting() { return waiting; }
void scheduleResumption() {
onReadyEvent.arm();
waiting = false;
}
private:
// -------------------------------------------------------
// PromiseNode implementation
void onReady(Event* event) noexcept override;
void tracePromise(TraceBuilder& builder, bool stopAtNextEvent) override;
// -------------------------------------------------------
// Event implementation
Maybe<Own<Event>> fire() override;
void traceEvent(TraceBuilder& builder) override;
// -------------------------------------------------------
// Disposer implementation
void disposeImpl(void* pointer) const override;
void destroy();
stdcoro::coroutine_handle<> coroutine;
ExceptionOrValue& resultRef;
OnReadyEvent onReadyEvent;
bool waiting = true;
bool hasSuspendedAtLeastOnce = false;
Maybe<PromiseNode&> promiseNodeForTrace;
// Whenever this coroutine is suspended waiting on another promise, we keep a reference to that
// promise so tracePromise()/traceEvent() can trace into it.
UnwindDetector unwindDetector;
struct DisposalResults {
bool destructorRan = false;
Maybe<Exception> exception;
};
Maybe<DisposalResults&> maybeDisposalResults;
// Only non-null during destruction. Before calling coroutine.destroy(), our disposer sets this
// to point to a DisposalResults on the stack so unhandled_exception() will have some place to
// store unwind exceptions. We can't store them in this Coroutine, because we'll be destroyed once
// coroutine.destroy() has returned. Our disposer then rethrows as needed.
};
template <typename Self, typename T>
class CoroutineMixin;
// CRTP mixin, covered later.
template <typename T>
class Coroutine final: public CoroutineBase,
public CoroutineMixin<Coroutine<T>, T> {
// The standard calls this the `promise_type` object. We can call this the "coroutine
// implementation object" since the word promise means different things in KJ and std styles. This
// is where we implement how a `kj::Promise<T>` is returned from a coroutine, and how that promise
// is later fulfilled. We also fill in a few lifetime-related details.
//
// The implementation object is also where we can customize memory allocation of coroutine frames,
// by implementing a member `operator new(size_t, Args...)` (same `Args...` as in
// coroutine_traits).
//
// We can also customize how await-expressions are transformed within `kj::Promise<T>`-based
// coroutines by implementing an `await_transform(P)` member function, where `P` is some type for
// which we want to implement co_await support, e.g. `kj::Promise<U>`. This feature allows us to
// provide an optimized `kj::EventLoop` integration when the coroutine's return type and the
// await-expression's type are both `kj::Promise` instantiations -- see further comments under
// `await_transform()`.
public:
using Handle = stdcoro::coroutine_handle<Coroutine<T>>;
Coroutine(SourceLocation location = {})
: CoroutineBase(Handle::from_promise(*this), result, location) {}
Promise<T> get_return_object() {
// Called after coroutine frame construction and before initial_suspend() to create the
// coroutine's return object. `this` itself lives inside the coroutine frame, and we arrange for
// the returned Promise<T> to own `this` via a custom Disposer and by always leaving the
// coroutine in a suspended state.
return PromiseNode::to<Promise<T>>(Own<PromiseNode>(this, *this));
}
public:
template <typename U>
class Awaiter;
template <typename U>
Awaiter<U> await_transform(kj::Promise<U>& promise) { return Awaiter<U>(kj::mv(promise)); }
template <typename U>
Awaiter<U> await_transform(kj::Promise<U>&& promise) { return Awaiter<U>(kj::mv(promise)); }
// Called when someone writes `co_await promise`, where `promise` is a kj::Promise<U>. We return
// an Awaiter<U>, which implements coroutine suspension and resumption in terms of the KJ async
// event system.
//
// There is another hook we could implement: an `operator co_await()` free function. However, a
// free function would be unaware of the type of the enclosing coroutine. Since Awaiter<U> is a
// member class template of Coroutine<T>, it is able to implement an
// `await_suspend(Coroutine<T>::Handle)` override, providing it type-safe access to our enclosing
// coroutine's PromiseNode. An `operator co_await()` free function would have to implement
// a type-erased `await_suspend(stdcoro::coroutine_handle<void>)` override, and implement
// suspension and resumption in terms of .then(). Yuck!
private:
// -------------------------------------------------------
// PromiseNode implementation
void get(ExceptionOrValue& output) noexcept override {
output.as<FixVoid<T>>() = kj::mv(result);
}
void fulfill(FixVoid<T>&& value) {
// Called by the return_value()/return_void() functions in our mixin class.
if (isWaiting()) {
result = kj::mv(value);
scheduleResumption();
}
}
ExceptionOr<FixVoid<T>> result;
friend class CoroutineMixin<Coroutine<T>, T>;
};
template <typename Self, typename T>
class CoroutineMixin {
public:
void return_value(T value) {
static_cast<Self*>(this)->fulfill(kj::mv(value));
}
};
template <typename Self>
class CoroutineMixin<Self, void> {
public:
void return_void() {
static_cast<Self*>(this)->fulfill(_::Void());
}
};
// The Coroutines spec has no `_::FixVoid<T>` equivalent to unify valueful and valueless co_return
// statements, and programs are ill-formed if the coroutine implementation object (Coroutine<T>) has
// both a `return_value()` and `return_void()`. No amount of EnableIffery can get around it, so
// these return_* functions live in a CRTP mixin.
class CoroutineBase::AwaiterBase {
public:
explicit AwaiterBase(Own<PromiseNode> node);
AwaiterBase(AwaiterBase&&);
~AwaiterBase() noexcept(false);
KJ_DISALLOW_COPY(AwaiterBase);
bool await_ready() const { return false; }
// This could return "`node->get()` is safe to call" instead, which would make suspension-less
// co_awaits possible for immediately-fulfilled promises. However, we need an Event to figure that
// out, and we won't have access to the Coroutine Event until await_suspend() is called. So, we
// must return false here. Fortunately, await_suspend() has a trick up its sleeve to enable
// suspension-less co_awaits.
protected:
void getImpl(ExceptionOrValue& result);
bool awaitSuspendImpl(CoroutineBase& coroutineEvent);
private:
UnwindDetector unwindDetector;
Own<PromiseNode> node;
Maybe<CoroutineBase&> maybeCoroutineEvent;
// If we do suspend waiting for our wrapped promise, we store a reference to `node` in our
// enclosing Coroutine for tracing purposes. To guard against any edge cases where an async stack
// trace is generated when an Awaiter was destroyed without Coroutine::fire() having been called,
// we need our own reference to the enclosing Coroutine. (I struggle to think up any such
// scenarios, but perhaps they could occur when destroying a suspended coroutine.)
};
template <typename T>
template <typename U>
class Coroutine<T>::Awaiter: public AwaiterBase {
// Wrapper around a co_await'ed promise and some storage space for the result of that promise.
// The compiler arranges to call our await_suspend() to suspend, which arranges to be woken up
// when the awaited promise is settled. Once that happens, the enclosing coroutine's Event
// implementation resumes the coroutine, which transitively calls await_resume() to unwrap the
// awaited promise result.
public:
explicit Awaiter(Promise<U> promise): AwaiterBase(PromiseNode::from(kj::mv(promise))) {}
U await_resume() {
getImpl(result);
auto value = kj::_::readMaybe(result.value);
KJ_IASSERT(value != nullptr, "Neither exception nor value present.");
return U(kj::mv(*value));
}
bool await_suspend(Coroutine::Handle coroutine) {
return awaitSuspendImpl(coroutine.promise());
}
private:
ExceptionOr<FixVoid<U>> result;
};
#undef KJ_COROUTINE_STD_NAMESPACE
} // namespace kj::_ (private)
#endif // KJ_HAS_COROUTINE
KJ_END_HEADER