blob: 8e6f84c11ae40d194c8aec9066bcadd271a1eda6 [file] [log] [blame]
// Copyright (c) 2021 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
#include "async.h"
#include "common.h"
#include "debug.h"
#include "list.h"
#include "memory.h"
#include <list>
KJ_BEGIN_HEADER
namespace kj {
template <typename T>
class WaiterQueue {
public:
// A WaiterQueue creates Nodes that blend newAdaptedPromise<T, Adaptor> and List<Node>.
WaiterQueue() = default;
KJ_DISALLOW_COPY(WaiterQueue);
Promise<T> wait() {
return newAdaptedPromise<T, Node>(queue);
}
void fulfill(T&& value) {
KJ_IREQUIRE(!empty());
auto& node = static_cast<Node&>(queue.front());
node.fulfiller.fulfill(kj::mv(value));
node.remove();
}
void reject(Exception&& exception) {
KJ_IREQUIRE(!empty());
auto& node = static_cast<Node&>(queue.front());
node.fulfiller.reject(kj::mv(exception));
node.remove();
}
bool empty() const {
return queue.empty();
}
private:
struct BaseNode {
// This is a separate structure because List requires a predefined memory layout but
// newAdaptedPromise() only provides access to the Adaptor type in the ctor.
BaseNode(PromiseFulfiller<T>& fulfiller): fulfiller(fulfiller) {}
PromiseFulfiller<T>& fulfiller;
ListLink<BaseNode> link;
};
using Queue = List<BaseNode, &BaseNode::link>;
struct Node: public BaseNode {
Node(PromiseFulfiller<T>& fulfiller, Queue& queue): BaseNode(fulfiller), queue(queue) {
queue.add(*this);
}
~Node() noexcept(false) {
// When the associated Promise is destructed, so is the Node thus we should leave the queue.
remove();
}
void remove() {
if(BaseNode::link.isLinked()){
queue.remove(*this);
}
}
Queue& queue;
};
Queue queue;
};
template <typename T>
class ProducerConsumerQueue {
// ProducerConsumerQueue is an async FIFO queue.
public:
void push(T v) {
// Push an existing value onto the queue.
if (!waiters.empty()) {
// We have at least one waiter, give the value to the oldest.
KJ_IASSERT(values.empty());
// Fulfill the first waiter and return without store our value.
waiters.fulfill(kj::mv(v));
} else {
// We don't have any waiters, store the value.
values.push_front(kj::mv(v));
}
}
void rejectAll(Exception e) {
// Reject all waiters with a given exception.
while (!waiters.empty()) {
auto newE = Exception(e);
waiters.reject(kj::mv(newE));
}
}
Promise<T> pop() {
// Eventually pop a value from the queue.
// Note that if your sinks lag your sources, the promise will always be ready.
if (!values.empty()) {
// We have at least one value, get the oldest.
KJ_IASSERT(waiters.empty());
auto value = kj::mv(values.back());
values.pop_back();
return kj::mv(value);
} else {
// We don't have any values, add ourselves to the waiting queue.
return waiters.wait();
}
}
private:
std::list<T> values;
WaiterQueue<T> waiters;
};
} // namespace kj
KJ_END_HEADER