| // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
| // Licensed under the MIT License: |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files (the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| #pragma once |
| |
| #include "async-prelude.h" |
| #include "exception.h" |
| #include "refcount.h" |
| |
| KJ_BEGIN_HEADER |
| |
| #ifndef KJ_USE_FIBERS |
| #if __BIONIC__ || __FreeBSD__ || __OpenBSD__ || KJ_NO_EXCEPTIONS |
| // These platforms don't support fibers. |
| #define KJ_USE_FIBERS 0 |
| #else |
| #define KJ_USE_FIBERS 1 |
| #endif |
| #else |
| #if KJ_NO_EXCEPTIONS && KJ_USE_FIBERS |
| #error "Fibers cannot be enabled when exceptions are disabled." |
| #endif |
| #endif |
| |
| namespace kj { |
| |
| class EventLoop; |
| class WaitScope; |
| |
| template <typename T> |
| class Promise; |
| template <typename T> |
| class ForkedPromise; |
| template <typename T> |
| class PromiseFulfiller; |
| template <typename T> |
| struct PromiseFulfillerPair; |
| |
| template <typename Func> |
| class FunctionParam; |
| |
| template <typename Func, typename T> |
| using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>; |
| // Evaluates to the type of Promise for the result of calling functor type Func with parameter type |
| // T. If T is void, then the promise is for the result of calling Func with no arguments. If |
| // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>. |
| |
| // ======================================================================================= |
| // Promises |
| |
| template <typename T> |
| class Promise: protected _::PromiseBase { |
| // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed |
| // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A. |
| // |
| // A Promise represents a promise to produce a value of type T some time in the future. Once |
| // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be |
| // "broken", with an Exception describing what went wrong. You may implicitly convert a value of |
| // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant |
| // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a |
| // `kj::Exception` to an already-broken promise of any type. |
| // |
| // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed |
| // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations |
| // meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless |
| // otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they |
| // should be rvalue-qualified, but at the time this interface was created compilers didn't widely |
| // support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If |
| // you want to use one Promise in two different places, you must fork it with `fork()`. |
| // |
| // To use the result of a Promise, you must call `then()` and supply a callback function to |
| // call with the result. `then()` returns another promise, for the result of the callback. |
| // Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a |
| // simple Promise<T> that first waits for the outer promise, then the inner. Example: |
| // |
| // // Open a remote file, read the content, and then count the |
| // // number of lines of text. |
| // // Note that none of the calls here block. `file`, `content` |
| // // and `lineCount` are all initialized immediately before any |
| // // asynchronous operations occur. The lambda callbacks are |
| // // called later. |
| // Promise<Own<File>> file = openFtp("ftp://host/foo/bar"); |
| // Promise<String> content = file.then( |
| // [](Own<File> file) -> Promise<String> { |
| // return file.readAll(); |
| // }); |
| // Promise<int> lineCount = content.then( |
| // [](String text) -> int { |
| // uint count = 0; |
| // for (char c: text) count += (c == '\n'); |
| // return count; |
| // }); |
| // |
| // For `then()` to work, the current thread must have an active `EventLoop`. Each callback |
| // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current |
| // thread's event loop, you do not need to worry about two callbacks running at the same time. |
| // You will need to set up at least one `EventLoop` at the top level of your program before you |
| // can use promises. |
| // |
| // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`. |
| // |
| // Systems using promises should consider supporting the concept of "pipelining". Pipelining |
| // means allowing a caller to start issuing method calls against a promised object before the |
| // promise has actually been fulfilled. This is particularly useful if the promise is for a |
| // remote object living across a network, as this can avoid round trips when chaining a series |
| // of calls. It is suggested that any class T which supports pipelining implement a subclass of |
| // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please |
| // invoke the corresponding method on the promised value once it is available". These methods |
| // should in turn return promises for the eventual results of said invocations. Cap'n Proto, |
| // for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see |
| // `capnp/capability.h`. |
| // |
| // KJ Promises are based on E promises: |
| // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises |
| // |
| // KJ Promises are also inspired in part by the evolving standards for JavaScript/ECMAScript |
| // promises, which are themselves influenced by E promises: |
| // http://promisesaplus.com/ |
| // https://github.com/domenic/promises-unwrapping |
| |
| public: |
| Promise(_::FixVoid<T> value); |
| // Construct an already-fulfilled Promise from a value of type T. For non-void promises, the |
| // parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can |
| // say `return 123;` to return a promise that is already fulfilled to 123. |
| // |
| // For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`. |
| |
| Promise(kj::Exception&& e); |
| // Construct an already-broken Promise. |
| |
| inline Promise(decltype(nullptr)) {} |
| |
| template <typename Func, typename ErrorFunc = _::PropagateException> |
| PromiseForResult<Func, T> then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException(), |
| SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Register a continuation function to be executed when the promise completes. The continuation |
| // (`func`) takes the promised value (an rvalue of type `T`) as its parameter. The continuation |
| // may return a new value; `then()` itself returns a promise for the continuation's eventual |
| // result. If the continuation itself returns a `Promise<U>`, then `then()` shall also return |
| // a `Promise<U>` which first waits for the original promise, then executes the continuation, |
| // then waits for the inner promise (i.e. it automatically "unwraps" the promise). |
| // |
| // In all cases, `then()` returns immediately. The continuation is executed later. The |
| // continuation is always executed on the same EventLoop (and, therefore, the same thread) which |
| // called `then()`, therefore no synchronization is necessary on state shared by the continuation |
| // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws |
| // an exception. |
| // |
| // You may also specify an error handler continuation as the second parameter. `errorHandler` |
| // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same |
| // type as `func` returns (except when `func` returns `Promise<U>`, in which case `errorHandler` |
| // may return either `Promise<U>` or just `U`). The default error handler simply propagates the |
| // exception to the returned promise. |
| // |
| // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise |
| // is broken. When compiled with -fno-exceptions, the framework will still detect when a |
| // recoverable exception was thrown inside of a continuation and will consider the promise |
| // broken even though a (presumably garbage) result was returned. |
| // |
| // If the returned promise is destroyed before the callback runs, the callback will be canceled |
| // (it will never run). |
| // |
| // Note that `then()` -- like all other Promise methods -- consumes the promise on which it is |
| // called, in the sense of move semantics. After returning, the original promise is no longer |
| // valid, but `then()` returns a new promise. |
| // |
| // *Advanced implementation tips:* Most users will never need to worry about the below, but |
| // it is good to be aware of. |
| // |
| // As an optimization, if the callback function `func` does _not_ return another promise, then |
| // execution of `func` itself may be delayed until its result is known to be needed. The |
| // expectation here is that `func` is just doing some transformation on the results, not |
| // scheduling any other actions, therefore the system doesn't need to be proactive about |
| // evaluating it. This way, a chain of trivial then() transformations can be executed all at |
| // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()` |
| // method to suppress this behavior. |
| // |
| // On the other hand, if `func` _does_ return another promise, then the system evaluates `func` |
| // as soon as possible, because the promise it returns might be for a newly-scheduled |
| // long-running asynchronous task. |
| // |
| // As another optimization, when a callback function registered with `then()` is actually |
| // scheduled, it is scheduled to occur immediately, preempting other work in the event queue. |
| // This allows a long chain of `then`s to execute all at once, improving cache locality by |
| // clustering operations on the same data. However, this implies that starvation can occur |
| // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for |
| // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events |
| // in the queue will get a chance to run before your callback is executed. |
| |
| Promise<void> ignoreResult() KJ_WARN_UNUSED_RESULT { return then([](T&&) {}); } |
| // Convenience method to convert the promise to a void promise by ignoring the return value. |
| // |
| // You must still wait on the returned promise if you want the task to execute. |
| |
| template <typename ErrorFunc> |
| Promise<T> catch_(ErrorFunc&& errorHandler, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Equivalent to `.then(identityFunc, errorHandler)`, where `identifyFunc` is a function that |
| // just returns its input. |
| |
| T wait(WaitScope& waitScope, SourceLocation location = {}); |
| // Run the event loop until the promise is fulfilled, then return its result. If the promise |
| // is rejected, throw an exception. |
| // |
| // wait() is primarily useful at the top level of a program -- typically, within the function |
| // that allocated the EventLoop. For example, a program that performs one or two RPCs and then |
| // exits would likely use wait() in its main() function to wait on each RPC. On the other hand, |
| // server-side code generally cannot use wait(), because it has to be able to accept multiple |
| // requests at once. |
| // |
| // If the promise is rejected, `wait()` throws an exception. If the program was compiled without |
| // exceptions (-fno-exceptions), this will usually abort. In this case you really should first |
| // use `then()` to set an appropriate handler for the exception case, so that the promise you |
| // actually wait on never throws. |
| // |
| // `waitScope` is an object proving that the caller is in a scope where wait() is allowed. By |
| // convention, any function which might call wait(), or which might call another function which |
| // might call wait(), must take `WaitScope&` as one of its parameters. This is needed for two |
| // reasons: |
| // * `wait()` is not allowed during an event callback, because event callbacks are themselves |
| // called during some other `wait()`, and such recursive `wait()`s would only be able to |
| // complete in LIFO order, which might mean that the outer `wait()` ends up waiting longer |
| // than it is supposed to. To prevent this, a `WaitScope` cannot be constructed or used during |
| // an event callback. |
| // * Since `wait()` runs the event loop, unrelated event callbacks may execute before `wait()` |
| // returns. This means that anyone calling `wait()` must be reentrant -- state may change |
| // around them in arbitrary ways. Therefore, callers really need to know if a function they |
| // are calling might wait(), and the `WaitScope&` parameter makes this clear. |
| // |
| // Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the |
| // top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what |
| // actually causes the event loop to run at all. This top-level `WaitScope` cannot be used |
| // recursively, so cannot be used within an event callback. |
| // |
| // However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use |
| // kj::startFiber() to start some code executing on an alternate call stack. That code will get |
| // its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()` |
| // switches back to the main stack in order to run the event loop, returning to the fiber's stack |
| // once the awaited promise resolves. |
| |
| bool poll(WaitScope& waitScope, SourceLocation location = {}); |
| // Returns true if a call to wait() would complete without blocking, false if it would block. |
| // |
| // If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an |
| // attempt to resolve it. Only when there is nothing left to do will it return false. |
| // |
| // Generally, poll() is most useful in tests. Often, you may want to verify that a promise does |
| // not resolve until some specific event occurs. To do so, poll() the promise before the event to |
| // verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves. |
| // The first poll() verifies that the promise doesn't resolve early, which would otherwise be |
| // hard to do deterministically. The second poll() allows you to check that the promise has |
| // resolved and avoid a wait() that might deadlock in the case that it hasn't. |
| // |
| // poll() is not supported in fibers; it will throw an exception. |
| |
| ForkedPromise<T> fork(SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Forks the promise, so that multiple different clients can independently wait on the result. |
| // `T` must be copy-constructable for this to work. Or, in the special case where `T` is |
| // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same |
| // (or an equivalent) object (probably implemented via reference counting). |
| |
| _::SplitTuplePromise<T> split(SourceLocation location = {}); |
| // Split a promise for a tuple into a tuple of promises. |
| // |
| // E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns |
| // `kj::Tuple<Promise<T>, Promise<U>>`. |
| |
| Promise<T> exclusiveJoin(Promise<T>&& other, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Return a new promise that resolves when either the original promise resolves or `other` |
| // resolves (whichever comes first). The promise that didn't resolve first is canceled. |
| |
| // TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions |
| // and produces a tuple? |
| |
| template <typename... Attachments> |
| Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT; |
| // "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will |
| // be destroyed when the promise resolves. This is useful when a promise's callback contains |
| // pointers into some object and you want to make sure the object still exists when the callback |
| // runs -- after calling then(), use attach() to add necessary objects to the result. |
| |
| template <typename ErrorFunc> |
| Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler, SourceLocation location = {}) |
| KJ_WARN_UNUSED_RESULT; |
| Promise<T> eagerlyEvaluate(decltype(nullptr), SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Force eager evaluation of this promise. Use this if you are going to hold on to the promise |
| // for awhile without consuming the result, but you want to make sure that the system actually |
| // processes it. |
| // |
| // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to |
| // `then()`, or the parameter to `catch_()`. We make you specify this because otherwise it's |
| // easy to forget to handle errors in a promise that you never use. You may specify nullptr for |
| // the error handler if you are sure that ignoring errors is fine, or if you know that you'll |
| // eventually wait on the promise somewhere. |
| |
| template <typename ErrorFunc> |
| void detach(ErrorFunc&& errorHandler); |
| // Allows the promise to continue running in the background until it completes or the |
| // `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this |
| // promise, you need to make sure that the promise owns all the objects it touches or make sure |
| // those objects outlive the EventLoop. |
| // |
| // `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to |
| // `then()`, except that it must return void. |
| // |
| // This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be |
| // canceled unless the callee explicitly permits it. |
| |
| kj::String trace(); |
| // Returns a dump of debug info about this promise. Not for production use. Requires RTTI. |
| // This method does NOT consume the promise as other methods do. |
| |
| private: |
| Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {} |
| // Second parameter prevent ambiguity with immediate-value constructor. |
| |
| friend class _::PromiseNode; |
| }; |
| |
| template <typename T> |
| class ForkedPromise { |
| // The result of `Promise::fork()` and `EventLoop::fork()`. Allows branches to be created. |
| // Like `Promise<T>`, this is a pass-by-move type. |
| |
| public: |
| inline ForkedPromise(decltype(nullptr)) {} |
| |
| Promise<T> addBranch(); |
| // Add a new branch to the fork. The branch is equivalent to the original promise. |
| |
| bool hasBranches(); |
| // Returns true if there are any branches that haven't been canceled. |
| |
| private: |
| Own<_::ForkHub<_::FixVoid<T>>> hub; |
| |
| inline ForkedPromise(bool, Own<_::ForkHub<_::FixVoid<T>>>&& hub): hub(kj::mv(hub)) {} |
| |
| friend class Promise<T>; |
| friend class EventLoop; |
| }; |
| |
| constexpr _::Void READY_NOW = _::Void(); |
| // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly |
| // cast to `Promise<void>`. |
| |
| constexpr _::NeverDone NEVER_DONE = _::NeverDone(); |
| // The opposite of `READY_NOW`, return this when the promise should never resolve. This can be |
| // implicitly converted to any promise type. You may also call `NEVER_DONE.wait()` to wait |
| // forever (useful for servers). |
| |
| template <typename Func> |
| PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT; |
| // Schedule for the given zero-parameter function to be executed in the event loop at some |
| // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns |
| // a promise, `evalLater()` returns a Promise for the result of resolving that promise. |
| // |
| // Example usage: |
| // Promise<int> x = evalLater([]() { return 123; }); |
| // |
| // The above is exactly equivalent to: |
| // Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; }); |
| // |
| // If the returned promise is destroyed before the callback runs, the callback will be canceled |
| // (never called). |
| // |
| // If you schedule several evaluations with `evalLater` during the same callback, they are |
| // guaranteed to be executed in order. |
| |
| template <typename Func> |
| PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT; |
| // Run `func()` and return a promise for its result. `func()` executes before `evalNow()` returns. |
| // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the |
| // main reason why `evalNow()` is useful. |
| |
| template <typename Func> |
| PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT; |
| // Like `evalLater()`, except that the function doesn't run until the event queue is otherwise |
| // completely empty and the thread is about to suspend waiting for I/O. |
| // |
| // This is useful when you need to perform some disruptive action and you want to make sure that |
| // you don't interrupt some other task between two .then() continuations. For example, say you want |
| // to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw |
| // them. It could be that a read() has completed and bytes have been transferred to the target |
| // buffer, but the .then() callback that handles the read result hasn't executed yet. If you |
| // cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do |
| // evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out |
| // and if you didn't receive the read result yet, then you know nothing has been read, and you can |
| // simply drop the promise. |
| // |
| // If evalLast() is called multiple times, functions are executed in LIFO order. If the first |
| // callback enqueues new events, then latter callbacks will not execute until those events are |
| // drained. |
| |
| ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); |
| kj::String getAsyncTrace(); |
| // If the event loop is currently running in this thread, get a trace back through the promise |
| // chain leading to the currently-executing event. The format is the same as kj::getStackTrace() |
| // from exception.c++. |
| |
| template <typename Func> |
| PromiseForResult<Func, void> retryOnDisconnect(Func&& func) KJ_WARN_UNUSED_RESULT; |
| // Promises to run `func()` asynchronously, retrying once if it fails with a DISCONNECTED exception. |
| // If the retry also fails, the exception is passed through. |
| // |
| // `func()` should return a `Promise`. `retryOnDisconnect(func)` returns the same promise, except |
| // with the retry logic added. |
| |
| template <typename Func> |
| PromiseForResult<Func, WaitScope&> startFiber( |
| size_t stackSize, Func&& func, SourceLocation location = {}) KJ_WARN_UNUSED_RESULT; |
| // Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be |
| // passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()` |
| // can be written in a synchronous, blocking style, instead of using `.then()`. This is often much |
| // easier to write and read, and may even be significantly faster if it allows the use of stack |
| // allocation rather than heap allocation. |
| // |
| // However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack. |
| // The entire stack must be allocated at once, making it necessary to choose a stack size upfront |
| // that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That |
| // said, over-estimating is not too terrible since pages of the stack will actually be allocated |
| // lazily when first accessed; actual memory usage will correspond to the "high watermark" of the |
| // actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow. |
| // Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will |
| // have to be interrupted to flush their CPU cores' TLB caches. |
| // |
| // In short, when performance matters, you should try to avoid creating fibers very frequently. |
| |
| class FiberPool final { |
| // A freelist pool of fibers with a set stack size. This improves CPU usage with fibers at |
| // the expense of memory usage. Fibers in this pool will always use the max amount of memory |
| // used until the pool is destroyed. |
| |
| public: |
| explicit FiberPool(size_t stackSize); |
| ~FiberPool() noexcept(false); |
| KJ_DISALLOW_COPY(FiberPool); |
| |
| void setMaxFreelist(size_t count); |
| // Set the maximum number of stacks to add to the freelist. If the freelist is full, stacks will |
| // be deleted rather than returned to the freelist. |
| |
| void useCoreLocalFreelists(); |
| // EXPERIMENTAL: Call to tell FiberPool to try to use core-local stack freelists, which |
| // in theory should increase L1/L2 cache efficacy for freelisted stacks. In practice, as of |
| // this writing, no performance advantage has yet been demonstrated. Note that currently this |
| // feature is only supported on Linux (the flag has no effect on other operating systems). |
| |
| template <typename Func> |
| PromiseForResult<Func, WaitScope&> startFiber( |
| Func&& func, SourceLocation location = {}) const KJ_WARN_UNUSED_RESULT; |
| // Executes `func()` in a fiber from this pool, returning a promise for the eventual result. |
| // `func()` will be passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on |
| // promises. Thus, `func()` can be written in a synchronous, blocking style, instead of |
| // using `.then()`. This is often much easier to write and read, and may even be significantly |
| // faster if it allows the use of stack allocation rather than heap allocation. |
| |
| void runSynchronously(kj::FunctionParam<void()> func) const; |
| // Use one of the stacks in the pool to synchronously execute func(), returning the result that |
| // func() returns. This is not the usual use case for fibers, but can be a nice optimization |
| // in programs that have many threads that mostly only need small stacks, but occasionally need |
| // a much bigger stack to run some deeply recursive algorithm. If the algorithm is run on each |
| // thread's normal call stack, then every thread's stack will tend to grow to be very big |
| // (usually, stacks automatically grow as needed, but do not shrink until the thread exits |
| // completely). If the thread can share a small set of big stacks that they use only when calling |
| // the deeply recursive algorithm, and use small stacks for everything else, overall memory usage |
| // is reduced. |
| // |
| // TODO(someday): If func() returns a value, return it from runSynchronously? Current use case |
| // doesn't need it. |
| |
| size_t getFreelistSize() const; |
| // Get the number of stacks currently in the freelist. Does not count stacks that are active. |
| |
| private: |
| class Impl; |
| Own<Impl> impl; |
| |
| friend class _::FiberStack; |
| friend class _::FiberBase; |
| }; |
| |
| template <typename T> |
| Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises, SourceLocation location = {}); |
| // Join an array of promises into a promise for an array. |
| |
| // ======================================================================================= |
| // Hack for creating a lambda that holds an owned pointer. |
| |
| template <typename Func, typename MovedParam> |
| class CaptureByMove { |
| public: |
| inline CaptureByMove(Func&& func, MovedParam&& param) |
| : func(kj::mv(func)), param(kj::mv(param)) {} |
| |
| template <typename... Params> |
| inline auto operator()(Params&&... params) |
| -> decltype(kj::instance<Func>()(kj::instance<MovedParam&&>(), kj::fwd<Params>(params)...)) { |
| return func(kj::mv(param), kj::fwd<Params>(params)...); |
| } |
| |
| private: |
| Func func; |
| MovedParam param; |
| }; |
| |
| template <typename Func, typename MovedParam> |
| inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func&& func) { |
| // Hack to create a "lambda" which captures a variable by moving it rather than copying or |
| // referencing. C++14 generalized captures should make this obsolete, but for now in C++11 this |
| // is commonly needed for Promise continuations that own their state. Example usage: |
| // |
| // Own<Foo> ptr = makeFoo(); |
| // Promise<int> promise = callRpc(); |
| // promise.then(mvCapture(ptr, [](Own<Foo>&& ptr, int result) { |
| // return ptr->finish(result); |
| // })); |
| |
| return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param)); |
| } |
| |
| // ======================================================================================= |
| // Advanced promise construction |
| |
| class PromiseRejector { |
| // Superclass of PromiseFulfiller containing the non-typed methods. Useful when you only really |
| // need to be able to reject a promise, and you need to operate on fulfillers of different types. |
| public: |
| virtual void reject(Exception&& exception) = 0; |
| virtual bool isWaiting() = 0; |
| }; |
| |
| template <typename T> |
| class PromiseFulfiller: public PromiseRejector { |
| // A callback which can be used to fulfill a promise. Only the first call to fulfill() or |
| // reject() matters; subsequent calls are ignored. |
| |
| public: |
| virtual void fulfill(T&& value) = 0; |
| // Fulfill the promise with the given value. |
| |
| virtual void reject(Exception&& exception) = 0; |
| // Reject the promise with an error. |
| |
| virtual bool isWaiting() = 0; |
| // Returns true if the promise is still unfulfilled and someone is potentially waiting for it. |
| // Returns false if fulfill()/reject() has already been called *or* if the promise to be |
| // fulfilled has been discarded and therefore the result will never be used anyway. |
| |
| template <typename Func> |
| bool rejectIfThrows(Func&& func); |
| // Call the function (with no arguments) and return true. If an exception is thrown, call |
| // `fulfiller.reject()` and then return false. When compiled with exceptions disabled, |
| // non-fatal exceptions are still detected and handled correctly. |
| }; |
| |
| template <> |
| class PromiseFulfiller<void>: public PromiseRejector { |
| // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>. |
| |
| public: |
| virtual void fulfill(_::Void&& value = _::Void()) = 0; |
| // Call with zero parameters. The parameter is a dummy that only exists so that subclasses don't |
| // have to specialize for <void>. |
| |
| virtual void reject(Exception&& exception) = 0; |
| virtual bool isWaiting() = 0; |
| |
| template <typename Func> |
| bool rejectIfThrows(Func&& func); |
| }; |
| |
| template <typename T, typename Adapter, typename... Params> |
| _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams); |
| // Creates a new promise which owns an instance of `Adapter` which encapsulates the operation |
| // that will eventually fulfill the promise. This is primarily useful for adapting non-KJ |
| // asynchronous APIs to use promises. |
| // |
| // An instance of `Adapter` will be allocated and owned by the returned `Promise`. A |
| // `PromiseFulfiller<T>&` will be passed as the first parameter to the adapter's constructor, |
| // and `adapterConstructorParams` will be forwarded as the subsequent parameters. The adapter |
| // is expected to perform some asynchronous operation and call the `PromiseFulfiller<T>` once |
| // it is finished. |
| // |
| // The adapter is destroyed when its owning Promise is destroyed. This may occur before the |
| // Promise has been fulfilled. In this case, the adapter's destructor should cancel the |
| // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be |
| // called. |
| // |
| // An adapter implementation should be carefully written to ensure that it cannot accidentally |
| // be left unfulfilled permanently because of an exception. Consider making liberal use of |
| // `PromiseFulfiller<T>::rejectIfThrows()`. |
| |
| template <typename T> |
| struct PromiseFulfillerPair { |
| _::ReducePromises<T> promise; |
| Own<PromiseFulfiller<T>> fulfiller; |
| }; |
| |
| template <typename T> |
| PromiseFulfillerPair<T> newPromiseAndFulfiller(SourceLocation location = {}); |
| // Construct a Promise and a separate PromiseFulfiller which can be used to fulfill the promise. |
| // If the PromiseFulfiller is destroyed before either of its methods are called, the Promise is |
| // implicitly rejected. |
| // |
| // Although this function is easier to use than `newAdaptedPromise()`, it has the serious drawback |
| // that there is no way to handle cancellation (i.e. detect when the Promise is discarded). |
| // |
| // You can arrange to fulfill a promise with another promise by using a promise type for T. E.g. |
| // `newPromiseAndFulfiller<Promise<U>>()` will produce a promise of type `Promise<U>` but the |
| // fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the |
| // `fulfill()` callback, and the promises are chained. |
| |
| template <typename T> |
| class CrossThreadPromiseFulfiller: public kj::PromiseFulfiller<T> { |
| // Like PromiseFulfiller<T> but the methods are `const`, indicating they can safely be called |
| // from another thread. |
| |
| public: |
| virtual void fulfill(T&& value) const = 0; |
| virtual void reject(Exception&& exception) const = 0; |
| virtual bool isWaiting() const = 0; |
| |
| void fulfill(T&& value) override { return constThis()->fulfill(kj::fwd<T>(value)); } |
| void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } |
| bool isWaiting() override { return constThis()->isWaiting(); } |
| |
| private: |
| const CrossThreadPromiseFulfiller* constThis() { return this; } |
| }; |
| |
| template <> |
| class CrossThreadPromiseFulfiller<void>: public kj::PromiseFulfiller<void> { |
| // Specialization of CrossThreadPromiseFulfiller for void promises. See |
| // CrossThreadPromiseFulfiller<T>. |
| |
| public: |
| virtual void fulfill(_::Void&& value = _::Void()) const = 0; |
| virtual void reject(Exception&& exception) const = 0; |
| virtual bool isWaiting() const = 0; |
| |
| void fulfill(_::Void&& value) override { return constThis()->fulfill(kj::mv(value)); } |
| void reject(Exception&& exception) override { return constThis()->reject(kj::mv(exception)); } |
| bool isWaiting() override { return constThis()->isWaiting(); } |
| |
| private: |
| const CrossThreadPromiseFulfiller* constThis() { return this; } |
| }; |
| |
| template <typename T> |
| struct PromiseCrossThreadFulfillerPair { |
| _::ReducePromises<T> promise; |
| Own<CrossThreadPromiseFulfiller<T>> fulfiller; |
| }; |
| |
| template <typename T> |
| PromiseCrossThreadFulfillerPair<T> newPromiseAndCrossThreadFulfiller(); |
| // Like `newPromiseAndFulfiller()`, but the fulfiller is allowed to be invoked from any thread, |
| // not just the one that called this method. Note that the Promise is still tied to the calling |
| // thread's event loop and *cannot* be used from another thread -- only the PromiseFulfiller is |
| // cross-thread. |
| |
| // ======================================================================================= |
| // Canceler |
| |
| class Canceler { |
| // A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or |
| // implicitly when the Canceler is destroyed. |
| // |
| // The cancellation is done in such a way that once cancel() (or the Canceler's destructor) |
| // returns, it's guaranteed that the promise has already been canceled and destroyed. This |
| // guarantee is important for enforcing ownership constraints. For example, imagine that Alice |
| // calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's |
| // internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed |
| // at random without Alice having canceled the promise. In this case, it is necessary for Bob to |
| // ensure that the promise will be forcefully canceled. Bob can do this by constructing a |
| // Canceler and using it to wrap promises before returning them to callers. When Bob is |
| // destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors. |
| // |
| // Note that another common strategy for cancellation is to use exclusiveJoin() to join a promise |
| // with some "cancellation promise" which only resolves if the operation should be canceled. The |
| // cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus |
| // calling the PromiseFulfiller cancels the operation. There is a major problem with this |
| // approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the |
| // exclusive-joined promise actually resolves and cancels its other fork. During that time, the |
| // task might continue to execute. If it holds pointers to objects that have been destroyed, this |
| // might cause segfaults. Thus, it is safer to use a Canceler. |
| |
| public: |
| inline Canceler() {} |
| ~Canceler() noexcept(false); |
| KJ_DISALLOW_COPY(Canceler); |
| |
| template <typename T> |
| Promise<T> wrap(Promise<T> promise) { |
| return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise)); |
| } |
| |
| void cancel(StringPtr cancelReason); |
| void cancel(const Exception& exception); |
| // Cancel all previously-wrapped promises that have not already completed, causing them to throw |
| // the given exception. If you provide just a description message instead of an exception, then |
| // an exception object will be constructed from it -- but only if there are requests to cancel. |
| |
| void release(); |
| // Releases previously-wrapped promises, so that they will not be canceled regardless of what |
| // happens to this Canceler. |
| |
| bool isEmpty() const { return list == nullptr; } |
| // Indicates if any previously-wrapped promises are still executing. (If this returns true, then |
| // cancel() would be a no-op.) |
| |
| private: |
| class AdapterBase { |
| public: |
| AdapterBase(Canceler& canceler); |
| ~AdapterBase() noexcept(false); |
| |
| virtual void cancel(Exception&& e) = 0; |
| |
| void unlink(); |
| |
| private: |
| Maybe<Maybe<AdapterBase&>&> prev; |
| Maybe<AdapterBase&> next; |
| friend class Canceler; |
| }; |
| |
| template <typename T> |
| class AdapterImpl: public AdapterBase { |
| public: |
| AdapterImpl(PromiseFulfiller<T>& fulfiller, |
| Canceler& canceler, Promise<T> inner) |
| : AdapterBase(canceler), |
| fulfiller(fulfiller), |
| inner(inner.then( |
| [&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); }, |
| [&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); }) |
| .eagerlyEvaluate(nullptr)) {} |
| |
| void cancel(Exception&& e) override { |
| fulfiller.reject(kj::mv(e)); |
| inner = nullptr; |
| } |
| |
| private: |
| PromiseFulfiller<T>& fulfiller; |
| Promise<void> inner; |
| }; |
| |
| Maybe<AdapterBase&> list; |
| }; |
| |
| template <> |
| class Canceler::AdapterImpl<void>: public AdapterBase { |
| public: |
| AdapterImpl(kj::PromiseFulfiller<void>& fulfiller, |
| Canceler& canceler, kj::Promise<void> inner); |
| void cancel(kj::Exception&& e) override; |
| // These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to |
| // link with symbols defined in async.c++ merely because they included async.h. |
| |
| private: |
| kj::PromiseFulfiller<void>& fulfiller; |
| kj::Promise<void> inner; |
| }; |
| |
| // ======================================================================================= |
| // TaskSet |
| |
| class TaskSet { |
| // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory |
| // associated with each promise is automatically freed when the promise completes. Destroying |
| // the TaskSet itself automatically cancels all unfinished promises. |
| // |
| // This is useful for "daemon" objects that perform background tasks which aren't intended to |
| // fulfill any particular external promise, but which may need to be canceled (and thus can't |
| // use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is |
| // working on. This way, if the daemon itself is destroyed, the TaskSet is destroyed as well, |
| // and everything the daemon is doing is canceled. |
| |
| public: |
| class ErrorHandler { |
| public: |
| virtual void taskFailed(kj::Exception&& exception) = 0; |
| }; |
| |
| TaskSet(ErrorHandler& errorHandler, SourceLocation location = {}); |
| // `errorHandler` will be executed any time a task throws an exception, and will execute within |
| // the given EventLoop. |
| |
| ~TaskSet() noexcept(false); |
| |
| void add(Promise<void>&& promise); |
| |
| kj::String trace(); |
| // Return debug info about all promises currently in the TaskSet. |
| |
| bool isEmpty() { return tasks == nullptr; } |
| // Check if any tasks are running. |
| |
| Promise<void> onEmpty(); |
| // Returns a promise that fulfills the next time the TaskSet is empty. Only one such promise can |
| // exist at a time. |
| |
| private: |
| class Task; |
| |
| TaskSet::ErrorHandler& errorHandler; |
| Maybe<Own<Task>> tasks; |
| Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller; |
| SourceLocation location; |
| }; |
| |
| // ======================================================================================= |
| // Cross-thread execution. |
| |
| class Executor { |
| // Executes code on another thread's event loop. |
| // |
| // Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current |
| // thread's event loop. You may then pass the reference to other threads to enable them to call |
| // back to this one. |
| |
| public: |
| Executor(EventLoop& loop, Badge<EventLoop>); |
| ~Executor() noexcept(false); |
| |
| virtual kj::Own<const Executor> addRef() const = 0; |
| // Add a reference to this Executor. The Executor will not be destroyed until all references are |
| // dropped. This uses atomic refcounting for thread-safety. |
| // |
| // Use this when you can't guarantee that the target thread's event loop won't concurrently exit |
| // (including due to an uncaught exception!) while another thread is still using the Executor. |
| // Otherwise, the Executor object is destroyed when the owning event loop exits. |
| // |
| // If the target event loop has exited, then `execute{Async,Sync}` will throw DISCONNECTED |
| // exceptions. |
| |
| bool isLive() const; |
| // Returns true if the remote event loop still exists, false if it has been destroyed. In the |
| // latter case, `execute{Async,Sync}()` will definitely throw. Of course, if this returns true, |
| // it could still change to false at any moment, and `execute{Async,Sync}()` could still throw as |
| // a result. |
| // |
| // TODO(cleanup): Should we have tryExecute{Async,Sync}() that return Maybes that are null if |
| // the remote event loop exited? Currently there are multiple known use cases that check |
| // isLive() after catching a DISCONNECTED exception to decide whether it is due to the executor |
| // exiting, and then handling that case. This is borderline in violation of KJ exception |
| // philosophy, but right now I'm not excited about the extra template metaprogramming needed |
| // for "try" versions... |
| |
| template <typename Func> |
| PromiseForResult<Func, void> executeAsync(Func&& func, SourceLocation location = {}) const; |
| // Call from any thread to request that the given function be executed on the executor's thread, |
| // returning a promise for the result. |
| // |
| // The Promise returned by executeAsync() belongs to the requesting thread, not the executor |
| // thread. Hence, for example, continuations added to this promise with .then() will execute in |
| // the requesting thread. |
| // |
| // If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting |
| // thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread |
| // awaits the promise. Once it resolves to a final result, that result is transferred to the |
| // requesting thread, resolving the promise that executeAsync() returned earlier. |
| // |
| // `func` will be destroyed in the requesting thread, after the final result has been returned |
| // from the executor thread. This means that it is safe for `func` to capture objects that cannot |
| // safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference, |
| // so long as the functor remains live until the promise completes or is canceled, and the |
| // function is thread-safe. |
| // |
| // Of course, the body of `func` must be careful that any access it makes on these objects is |
| // safe cross-thread. For example, it must not attempt to access Promise-related objects |
| // cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it |
| // from another. Unfortunately, the usual convention of using const-correctness to enforce |
| // thread-safety does not work here, because applications can often ensure that `func` has |
| // exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe |
| // ways; the const qualifier is not sufficient to express this. |
| // |
| // The final return value of `func` is transferred between threads, and hence is constructed and |
| // destroyed in separate threads. It is the app's responsibility to make sure this is OK. |
| // Alternatively, the app can perhaps arrange to send the return value back to the original |
| // thread for destruction, if needed. |
| // |
| // If the requesting thread destroys the returned Promise, the destructor will block waiting for |
| // the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed |
| // before the Promise's destructor returns. |
| // |
| // Multiple calls to executeAsync() from the same requesting thread to the same target thread |
| // will be delivered in the same order in which they were requested. (However, if func() returns |
| // a promise, delivery of subsequent calls is not blocked on that promise. In other words, this |
| // call provides E-Order in the same way as Cap'n Proto.) |
| |
| template <typename Func> |
| _::UnwrapPromise<PromiseForResult<Func, void>> executeSync( |
| Func&& func, SourceLocation location = {}) const; |
| // Schedules `func()` to execute on the executor thread, and then blocks the requesting thread |
| // until `func()` completes. If `func()` returns a Promise, then the wait will continue until |
| // that promise resolves, and the final result will be returned to the requesting thread. |
| // |
| // The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that |
| // loop will *not* execute while the thread is blocked. This method is particularly useful to |
| // allow non-event-loop threads to perform I/O via a separate event-loop thread. |
| // |
| // As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the |
| // executor thread has signaled completion. The return value is transferred between threads. |
| |
| private: |
| struct Impl; |
| Own<Impl> impl; |
| // To avoid including mutex.h... |
| |
| friend class EventLoop; |
| friend class _::XThreadEvent; |
| friend class _::XThreadPaf; |
| |
| void send(_::XThreadEvent& event, bool sync) const; |
| void wait(); |
| bool poll(); |
| |
| EventLoop& getLoop() const; |
| }; |
| |
| const Executor& getCurrentThreadExecutor(); |
| // Get the executor for the current thread's event loop. This reference can then be passed to other |
| // threads. |
| |
| // ======================================================================================= |
| // The EventLoop class |
| |
| class EventPort { |
| // Interfaces between an `EventLoop` and events originating from outside of the loop's thread. |
| // All such events come in through the `EventPort` implementation. |
| // |
| // An `EventPort` implementation may interface with low-level operating system APIs and/or other |
| // threads. You can also write an `EventPort` which wraps some other (non-KJ) event loop |
| // framework, allowing the two to coexist in a single thread. |
| |
| public: |
| virtual bool wait() = 0; |
| // Wait for an external event to arrive, sleeping if necessary. Once at least one event has |
| // arrived, queue it to the event loop (e.g. by fulfilling a promise) and return. |
| // |
| // This is called during `Promise::wait()` whenever the event queue becomes empty, in order to |
| // wait for new events to populate the queue. |
| // |
| // It is safe to return even if nothing has actually been queued, so long as calling `wait()` in |
| // a loop will eventually sleep. (That is to say, false positives are fine.) |
| // |
| // Returns true if wake() has been called from another thread. (Precisely, returns true if |
| // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last |
| // called.) |
| |
| virtual bool poll() = 0; |
| // Check if any external events have arrived, but do not sleep. If any events have arrived, |
| // add them to the event queue (e.g. by fulfilling promises) before returning. |
| // |
| // This may be called during `Promise::wait()` when the EventLoop has been executing for a while |
| // without a break but is still non-empty. |
| // |
| // Returns true if wake() has been called from another thread. (Precisely, returns true if |
| // no previous call to wait `wait()` nor `poll()` has returned true since `wake()` was last |
| // called.) |
| |
| virtual void setRunnable(bool runnable); |
| // Called to notify the `EventPort` when the `EventLoop` has work to do; specifically when it |
| // transitions from empty -> runnable or runnable -> empty. This is typically useful when |
| // integrating with an external event loop; if the loop is currently runnable then you should |
| // arrange to call run() on it soon. The default implementation does nothing. |
| |
| virtual void wake() const; |
| // Wake up the EventPort's thread from another thread. |
| // |
| // Unlike all other methods on this interface, `wake()` may be called from another thread, hence |
| // it is `const`. |
| // |
| // Technically speaking, `wake()` causes the target thread to cease sleeping and not to sleep |
| // again until `wait()` or `poll()` has returned true at least once. |
| // |
| // The default implementation throws an UNIMPLEMENTED exception. |
| }; |
| |
| class EventLoop { |
| // Represents a queue of events being executed in a loop. Most code won't interact with |
| // EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the |
| // documentation for `Promise`. |
| // |
| // Each thread can have at most one current EventLoop. To make an `EventLoop` current for |
| // the thread, create a `WaitScope`. Async APIs require that the thread has a current EventLoop, |
| // or they will throw exceptions. APIs that use `Promise::wait()` additionally must explicitly |
| // be passed a reference to the `WaitScope` to make the caller aware that they might block. |
| // |
| // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g. |
| // in the main() function, or in the start function of a thread. You can then use it to |
| // construct some promises and wait on the result. Example: |
| // |
| // int main() { |
| // // `loop` becomes the official EventLoop for the thread. |
| // MyEventPort eventPort; |
| // EventLoop loop(eventPort); |
| // |
| // // Now we can call an async function. |
| // Promise<String> textPromise = getHttp("http://example.com"); |
| // |
| // // And we can wait for the promise to complete. Note that you can only use `wait()` |
| // // from the top level, not from inside a promise callback. |
| // String text = textPromise.wait(); |
| // print(text); |
| // return 0; |
| // } |
| // |
| // Most applications that do I/O will prefer to use `setupAsyncIo()` from `async-io.h` rather |
| // than allocate an `EventLoop` directly. |
| |
| public: |
| EventLoop(); |
| // Construct an `EventLoop` which does not receive external events at all. |
| |
| explicit EventLoop(EventPort& port); |
| // Construct an `EventLoop` which receives external events through the given `EventPort`. |
| |
| ~EventLoop() noexcept(false); |
| |
| void run(uint maxTurnCount = maxValue); |
| // Run the event loop for `maxTurnCount` turns or until there is nothing left to be done, |
| // whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will |
| // call the `EventPort`'s `setRunnable(false)` if the queue becomes empty. |
| |
| bool isRunnable(); |
| // Returns true if run() would currently do anything, or false if the queue is empty. |
| |
| const Executor& getExecutor(); |
| // Returns an Executor that can be used to schedule events on this EventLoop from another thread. |
| // |
| // Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's |
| // Executor. |
| // |
| // Note that this is only needed for cross-thread scheduling. To schedule code to run later in |
| // the current thread, use `kj::evalLater()`, which will be more efficient. |
| |
| private: |
| kj::Maybe<EventPort&> port; |
| // If null, this thread doesn't receive I/O events from the OS. It can potentially receive |
| // events from other threads via the Executor. |
| |
| bool running = false; |
| // True while looping -- wait() is then not allowed. |
| |
| bool lastRunnableState = false; |
| // What did we last pass to port.setRunnable()? |
| |
| _::Event* head = nullptr; |
| _::Event** tail = &head; |
| _::Event** depthFirstInsertPoint = &head; |
| _::Event** breadthFirstInsertPoint = &head; |
| |
| kj::Maybe<Own<Executor>> executor; |
| // Allocated the first time getExecutor() is requested, making cross-thread request possible. |
| |
| Own<TaskSet> daemons; |
| |
| _::Event* currentlyFiring = nullptr; |
| |
| bool turn(); |
| void setRunnable(bool runnable); |
| void enterScope(); |
| void leaveScope(); |
| |
| void wait(); |
| void poll(); |
| |
| friend void _::detach(kj::Promise<void>&& promise); |
| friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, |
| WaitScope& waitScope, SourceLocation location); |
| friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); |
| friend class _::Event; |
| friend class WaitScope; |
| friend class Executor; |
| friend class _::XThreadEvent; |
| friend class _::XThreadPaf; |
| friend class _::FiberBase; |
| friend class _::FiberStack; |
| friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space); |
| }; |
| |
| class WaitScope { |
| // Represents a scope in which asynchronous programming can occur. A `WaitScope` should usually |
| // be allocated on the stack and serves two purposes: |
| // * While the `WaitScope` exists, its `EventLoop` is registered as the current loop for the |
| // thread. Most operations dealing with `Promise` (including all of its methods) do not work |
| // unless the thread has a current `EventLoop`. |
| // * `WaitScope` may be passed to `Promise::wait()` to synchronously wait for a particular |
| // promise to complete. See `Promise::wait()` for an extended discussion. |
| |
| public: |
| inline explicit WaitScope(EventLoop& loop): loop(loop) { loop.enterScope(); } |
| inline ~WaitScope() { if (fiber == nullptr) loop.leaveScope(); } |
| KJ_DISALLOW_COPY(WaitScope); |
| |
| void poll(); |
| // Pumps the event queue and polls for I/O until there's nothing left to do (without blocking). |
| // |
| // Not supported in fibers. |
| |
| void setBusyPollInterval(uint count) { busyPollInterval = count; } |
| // Set the maximum number of events to run in a row before calling poll() on the EventPort to |
| // check for new I/O. |
| // |
| // This has no effect when used in a fiber. |
| |
| void runEventCallbacksOnStackPool(kj::Maybe<const FiberPool&> pool) { runningStacksPool = pool; } |
| // Arranges to switch stacks while event callbacks are executing. This is an optimization that |
| // is useful for programs that use extremely high thread counts, where each thread has its own |
| // event loop, but each thread has relatively low event throughput, i.e. each thread spends |
| // most of its time waiting for I/O. Normally, the biggest problem with having lots of threads |
| // is that each thread must allocate a stack, and stacks can take a lot of memory if the |
| // application commonly makes deep calls. But, most of that stack space is only needed while |
| // the thread is executing, not while it's sleeping. So, if threads only switch to a big stack |
| // during execution, switching back when it's time to sleep, and if those stacks are freelisted |
| // so that they can be shared among threads, then a lot of memory is saved. |
| // |
| // We use the `FiberPool` type here because it implements a freelist of stacks, which is exactly |
| // what we happen to want! In our case, though, we don't use those stacks to implement fibers; |
| // we use them as the main thread stack. |
| // |
| // This has no effect if this WaitScope itself is for a fiber. |
| // |
| // Pass `nullptr` as the parameter to go back to running events on the main stack. |
| |
| void cancelAllDetached(); |
| // HACK: Immediately cancel all detached promises. |
| // |
| // New code should not use detached promises, and therefore should not need this. |
| // |
| // This method exists to help existing code deal with the problems of detached promises, |
| // especially at teardown time. |
| // |
| // This method may be removed in the future. |
| |
| private: |
| EventLoop& loop; |
| uint busyPollInterval = kj::maxValue; |
| |
| kj::Maybe<_::FiberBase&> fiber; |
| kj::Maybe<const FiberPool&> runningStacksPool; |
| |
| explicit WaitScope(EventLoop& loop, _::FiberBase& fiber) |
| : loop(loop), fiber(fiber) {} |
| |
| template <typename Func> |
| inline void runOnStackPool(Func&& func) { |
| KJ_IF_MAYBE(pool, runningStacksPool) { |
| pool->runSynchronously(kj::fwd<Func>(func)); |
| } else { |
| func(); |
| } |
| } |
| |
| friend class EventLoop; |
| friend class _::FiberBase; |
| friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, |
| WaitScope& waitScope, SourceLocation location); |
| friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location); |
| }; |
| |
| } // namespace kj |
| |
| #define KJ_ASYNC_H_INCLUDED |
| #include "async-inl.h" |
| |
| KJ_END_HEADER |