blob: 58fed747615dabf49cbe1c98aa0bbfe020203c04 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
#include "rpc.h"
#include "message.h"
#include <kj/async-io.h>
#include <capnp/serialize-async.h>
#include <capnp/rpc-twoparty.capnp.h>
#include <kj/one-of.h>
CAPNP_BEGIN_HEADER
namespace capnp {
namespace rpc {
namespace twoparty {
typedef VatId SturdyRefHostId; // For backwards-compatibility with version 0.4.
}
}
typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId,
rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinResult>
TwoPartyVatNetworkBase;
class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
private TwoPartyVatNetworkBase::Connection,
private RpcFlowController::WindowGetter {
// A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
// stream. This is used to implement the common case of a client/server network.
//
// See `ez-rpc.h` for a simple interface for setting up two-party clients and servers.
// Use `TwoPartyVatNetwork` only if you need the advanced features.
public:
TwoPartyVatNetwork(MessageStream& msgStream,
rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
TwoPartyVatNetwork(MessageStream& msgStream, uint maxFdsPerMessage,
rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
ReaderOptions receiveOptions = ReaderOptions(),
const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
TwoPartyVatNetwork(kj::AsyncCapabilityStream& stream, uint maxFdsPerMessage,
rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions(),
const kj::MonotonicClock& clock = kj::systemCoarseMonotonicClock());
// To support FD passing, pass an AsyncCapabilityStream or a MessageStream which supports
// fd passing, and `maxFdsPerMessage`, which specifies the maximum number of file descriptors
// to accept from the peer in any one RPC message. It is important to keep maxFdsPerMessage
// low in order to stop DoS attacks that fill up your FD table.
//
// Note that this limit applies only to incoming messages; outgoing messages are allowed to have
// more FDs. Sometimes it makes sense to enforce a limit of zero in one direction while having
// a non-zero limit in the other. For example, in a supervisor/sandbox scenario, typically there
// are many use cases for passing FDs from supervisor to sandbox but no use case for vice versa.
// The supervisor may be configured not to accept any FDs from the sandbox in order to reduce
// risk of DoS attacks.
//
// clock is used for calculating the oldest queued message age, which is a useful metric for
// detecting queue overload
KJ_DISALLOW_COPY(TwoPartyVatNetwork);
kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
// Returns a promise that resolves when the peer disconnects.
rpc::twoparty::Side getSide() { return side; }
size_t getCurrentQueueSize() { return currentQueueSize; }
// Get the number of bytes worth of outgoing messages that are currently queued in memory waiting
// to be sent on this connection. This may be useful for backpressure.
size_t getCurrentQueueCount() { return currentQueueCount; }
// Get the count of outgoing messages that are currently queued in memory waiting
// to be sent on this connection. This may be useful for backpressure.
kj::Duration getOutgoingMessageWaitTime();
// Get how long the current outgoing message has been waiting to be sent on this connection.
// Returns 0 if the queue is empty. This may be useful for backpressure.
// implements VatNetwork -----------------------------------------------------
kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> connect(
rpc::twoparty::VatId::Reader ref) override;
kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> accept() override;
private:
class OutgoingMessageImpl;
class IncomingMessageImpl;
kj::OneOf<MessageStream*, kj::Own<MessageStream>> stream;
// The underlying stream, which we may or may not own. Get a reference to
// this with getStream, rather than reading it directly.
uint maxFdsPerMessage;
rpc::twoparty::Side side;
MallocMessageBuilder peerVatId;
ReaderOptions receiveOptions;
bool accepted = false;
bool solSndbufUnimplemented = false;
// Whether stream.getsockopt(SO_SNDBUF) has been observed to throw UNIMPLEMENTED.
kj::Canceler readCanceler;
kj::Maybe<kj::Exception> readCancelReason;
// Used to propagate write errors into (permanent) read errors.
kj::Maybe<kj::Promise<void>> previousWrite;
// Resolves when the previous write completes. This effectively serves as the write queue.
// Becomes null when shutdown() is called.
kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller;
// Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the
// second call on the server side. Never fulfilled, because there is only one connection.
kj::ForkedPromise<void> disconnectPromise = nullptr;
size_t currentQueueSize = 0;
size_t currentQueueCount = 0;
const kj::MonotonicClock& clock;
kj::TimePoint currentOutgoingMessageSendTime;
class FulfillerDisposer: public kj::Disposer {
// Hack: TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection. When the RPC
// system detects (or initiates) a disconnection, it drops its reference to the Connection.
// When all references have been dropped, then we want disconnectPromise to be fulfilled.
// So we hand out Own<Connection>s with this disposer attached, so that we can detect when
// they are dropped.
public:
mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller;
mutable uint refcount = 0;
void disposeImpl(void* pointer) const override;
};
FulfillerDisposer disconnectFulfiller;
TwoPartyVatNetwork(
kj::OneOf<MessageStream*, kj::Own<MessageStream>>&& stream,
uint maxFdsPerMessage,
rpc::twoparty::Side side,
ReaderOptions receiveOptions,
const kj::MonotonicClock& clock);
MessageStream& getStream();
kj::Own<TwoPartyVatNetworkBase::Connection> asConnection();
// Returns a pointer to this with the disposer set to disconnectFulfiller.
// implements Connection -----------------------------------------------------
kj::Own<RpcFlowController> newStream() override;
rpc::twoparty::VatId::Reader getPeerVatId() override;
kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override;
kj::Promise<void> shutdown() override;
// implements WindowGetter ---------------------------------------------------
size_t getWindow() override;
};
class TwoPartyServer: private kj::TaskSet::ErrorHandler {
// Convenience class which implements a simple server which accepts connections on a listener
// socket and serices them as two-party connections.
public:
explicit TwoPartyServer(Capability::Client bootstrapInterface);
void accept(kj::Own<kj::AsyncIoStream>&& connection);
void accept(kj::Own<kj::AsyncCapabilityStream>&& connection, uint maxFdsPerMessage);
// Accepts the connection for servicing.
kj::Promise<void> accept(kj::AsyncIoStream& connection) KJ_WARN_UNUSED_RESULT;
kj::Promise<void> accept(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage)
KJ_WARN_UNUSED_RESULT;
// Accept connection without taking ownership. The returned promise resolves when the client
// disconnects. Dropping the promise forcefully cancels the RPC protocol.
//
// You probably can't do anything with `connection` after the RPC protocol has terminated, other
// than to close it. The main reason to use these methods rather than the ownership-taking ones
// is if your stream object becomes invalid outside some scope, so you want to make sure to
// cancel all usage of it before that by cancelling the promise.
kj::Promise<void> listen(kj::ConnectionReceiver& listener);
// Listens for connections on the given listener. The returned promise never resolves unless an
// exception is thrown while trying to accept. You may discard the returned promise to cancel
// listening.
kj::Promise<void> listenCapStreamReceiver(
kj::ConnectionReceiver& listener, uint maxFdsPerMessage);
// Listen with support for FD transfers. `listener.accept()` must return instances of
// AsyncCapabilityStream, otherwise this will crash.
kj::Promise<void> drain() { return tasks.onEmpty(); }
// Resolves when all clients have disconnected.
//
// Only considers clients whose connections TwoPartyServer took ownership of.
private:
Capability::Client bootstrapInterface;
kj::TaskSet tasks;
struct AcceptedConnection;
void taskFailed(kj::Exception&& exception) override;
};
class TwoPartyClient {
// Convenience class which implements a simple client.
public:
explicit TwoPartyClient(kj::AsyncIoStream& connection);
explicit TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage);
TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface,
rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT);
TwoPartyClient(kj::AsyncCapabilityStream& connection, uint maxFdsPerMessage,
Capability::Client bootstrapInterface,
rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT);
Capability::Client bootstrap();
// Get the server's bootstrap interface.
inline kj::Promise<void> onDisconnect() { return network.onDisconnect(); }
void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func);
// Forwarded to rpcSystem.setTraceEncoder().
size_t getCurrentQueueSize() { return network.getCurrentQueueSize(); }
size_t getCurrentQueueCount() { return network.getCurrentQueueCount(); }
kj::Duration getOutgoingMessageWaitTime() { return network.getOutgoingMessageWaitTime(); }
private:
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::VatId> rpcSystem;
};
} // namespace capnp
CAPNP_END_HEADER