blob: 8200328c1196ea636f0e686d23d841d9c8c1df14 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
#if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS)
#pragma GCC system_header
#endif
#include "common.h"
#include <capnp/serialize.h>
#include <capnp/serialize-packed.h>
#include <kj/debug.h>
#if HAVE_SNAPPY
#include <capnp/serialize-snappy.h>
#endif // HAVE_SNAPPY
#include <thread>
namespace capnp {
namespace benchmark {
namespace capnp {
class CountingOutputStream: public kj::FdOutputStream {
public:
CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {}
uint64_t throughput;
void write(const void* buffer, size_t size) override {
FdOutputStream::write(buffer, size);
throughput += size;
}
void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
FdOutputStream::write(pieces);
for (auto& piece: pieces) {
throughput += piece.size();
}
}
};
// =======================================================================================
struct Uncompressed {
typedef kj::FdInputStream& BufferedInput;
typedef InputStreamMessageReader MessageReader;
class ArrayMessageReader: public FlatArrayMessageReader {
public:
ArrayMessageReader(kj::ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
kj::ArrayPtr<word> scratchSpace = nullptr)
: FlatArrayMessageReader(kj::arrayPtr(
reinterpret_cast<const word*>(array.begin()),
reinterpret_cast<const word*>(array.end())), options) {}
};
static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
writeMessage(output, builder);
}
};
struct Packed {
typedef kj::BufferedInputStreamWrapper BufferedInput;
typedef PackedMessageReader MessageReader;
class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader {
public:
ArrayMessageReader(kj::ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
kj::ArrayPtr<word> scratchSpace = nullptr)
: ArrayInputStream(array),
PackedMessageReader(*this, options, scratchSpace) {}
};
static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder);
}
static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder);
}
};
#if HAVE_SNAPPY
static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE];
static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE];
static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE];
struct SnappyCompressed {
typedef BufferedInputStreamWrapper BufferedInput;
typedef SnappyPackedMessageReader MessageReader;
class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader {
public:
ArrayMessageReader(kj::ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
kj::ArrayPtr<word> scratchSpace = nullptr)
: ArrayInputStream(array),
SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace,
kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {}
};
static inline void write(OutputStream& output, MessageBuilder& builder) {
writeSnappyPackedMessage(output, builder,
kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE),
kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE));
}
};
#endif // HAVE_SNAPPY
// =======================================================================================
struct NoScratch {
struct ScratchSpace {};
template <typename Compression>
class MessageReader: public Compression::MessageReader {
public:
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
: Compression::MessageReader(input) {}
};
template <typename Compression>
class ArrayMessageReader: public Compression::ArrayMessageReader {
public:
inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
: Compression::ArrayMessageReader(input) {}
};
class MessageBuilder: public MallocMessageBuilder {
public:
inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {}
};
class ObjectSizeCounter {
public:
ObjectSizeCounter(uint64_t iters): counter(0) {}
template <typename RequestBuilder, typename ResponseBuilder>
void add(RequestBuilder& request, ResponseBuilder& response) {
for (auto segment: request.getSegmentsForOutput()) {
counter += segment.size() * sizeof(word);
}
for (auto segment: response.getSegmentsForOutput()) {
counter += segment.size() * sizeof(word);
}
}
uint64_t get() { return counter; }
private:
uint64_t counter;
};
};
constexpr size_t SCRATCH_SIZE = 128 * 1024;
word scratchSpace[6 * SCRATCH_SIZE];
int scratchCounter = 0;
struct UseScratch {
struct ScratchSpace {
word* words;
ScratchSpace() {
KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once.");
words = scratchSpace + scratchCounter++ * SCRATCH_SIZE;
}
~ScratchSpace() noexcept {
--scratchCounter;
}
};
template <typename Compression>
class MessageReader: public Compression::MessageReader {
public:
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
: Compression::MessageReader(
input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
};
template <typename Compression>
class ArrayMessageReader: public Compression::ArrayMessageReader {
public:
inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
: Compression::ArrayMessageReader(
input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
};
class MessageBuilder: public MallocMessageBuilder {
public:
inline MessageBuilder(ScratchSpace& scratch)
: MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
};
class ObjectSizeCounter {
public:
ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {}
template <typename RequestBuilder, typename ResponseBuilder>
void add(RequestBuilder& request, ResponseBuilder& response) {
size_t counter = 0;
for (auto segment: request.getSegmentsForOutput()) {
counter += segment.size() * sizeof(word);
}
for (auto segment: response.getSegmentsForOutput()) {
counter += segment.size() * sizeof(word);
}
maxSize = std::max(counter, maxSize);
}
uint64_t get() { return iters * maxSize; }
private:
uint64_t iters;
size_t maxSize;
};
};
// =======================================================================================
template <typename TestCase, typename ReuseStrategy, typename Compression>
struct BenchmarkMethods {
static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
kj::FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace builderScratch;
typename ReuseStrategy::ScratchSpace readerScratch;
for (; iters > 0; --iters) {
typename TestCase::Expectation expected;
{
typename ReuseStrategy::MessageBuilder builder(builderScratch);
expected = TestCase::setupRequest(
builder.template initRoot<typename TestCase::Request>());
Compression::write(output, builder);
}
{
typename ReuseStrategy::template MessageReader<Compression> reader(
bufferedInput, readerScratch);
if (!TestCase::checkResponse(
reader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
}
}
}
return output.throughput;
}
static uint64_t asyncClientSender(
int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) {
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace scratch;
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder builder(scratch);
expectations->post(TestCase::setupRequest(
builder.template initRoot<typename TestCase::Request>()));
Compression::write(output, builder);
}
return output.throughput;
}
static void asyncClientReceiver(
int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) {
kj::FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
typename ReuseStrategy::ScratchSpace scratch;
for (; iters > 0; --iters) {
typename TestCase::Expectation expected = expectations->next();
typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
if (!TestCase::checkResponse(
reader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
}
}
}
static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
ProducerConsumerQueue<typename TestCase::Expectation> expectations;
std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
receiverThread.join();
return throughput;
}
static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
kj::FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace builderScratch;
typename ReuseStrategy::ScratchSpace readerScratch;
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder builder(builderScratch);
typename ReuseStrategy::template MessageReader<Compression> reader(
bufferedInput, readerScratch);
TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
builder.template initRoot<typename TestCase::Response>());
Compression::write(output, builder);
}
return output.throughput;
}
static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
typename ReuseStrategy::ScratchSpace requestScratch;
typename ReuseStrategy::ScratchSpace responseScratch;
typename ReuseStrategy::ObjectSizeCounter counter(iters);
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder requestMessage(requestScratch);
auto request = requestMessage.template initRoot<typename TestCase::Request>();
typename TestCase::Expectation expected = TestCase::setupRequest(request);
typename ReuseStrategy::MessageBuilder responseMessage(responseScratch);
auto response = responseMessage.template initRoot<typename TestCase::Response>();
TestCase::handleRequest(request.asReader(), response);
if (!TestCase::checkResponse(response.asReader(), expected)) {
throw std::logic_error("Incorrect response.");
}
if (countObjectSize) {
counter.add(requestMessage, responseMessage);
}
}
return counter.get();
}
static uint64_t passByBytes(uint64_t iters) {
uint64_t throughput = 0;
typename ReuseStrategy::ScratchSpace clientRequestScratch;
UseScratch::ScratchSpace requestBytesScratch;
typename ReuseStrategy::ScratchSpace serverRequestScratch;
typename ReuseStrategy::ScratchSpace serverResponseScratch;
UseScratch::ScratchSpace responseBytesScratch;
typename ReuseStrategy::ScratchSpace clientResponseScratch;
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch);
typename TestCase::Expectation expected = TestCase::setupRequest(
requestBuilder.template initRoot<typename TestCase::Request>());
kj::ArrayOutputStream requestOutput(kj::arrayPtr(
reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word)));
Compression::write(requestOutput, requestBuilder);
throughput += requestOutput.getArray().size();
typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader(
requestOutput.getArray(), serverRequestScratch);
typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch);
TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
responseBuilder.template initRoot<typename TestCase::Response>());
kj::ArrayOutputStream responseOutput(
kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words),
SCRATCH_SIZE * sizeof(word)));
Compression::write(responseOutput, responseBuilder);
throughput += responseOutput.getArray().size();
typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader(
responseOutput.getArray(), clientResponseScratch);
if (!TestCase::checkResponse(
responseReader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
}
}
return throughput;
}
};
struct BenchmarkTypes {
typedef capnp::Uncompressed Uncompressed;
typedef capnp::Packed Packed;
#if HAVE_SNAPPY
typedef capnp::SnappyCompressed SnappyCompressed;
#endif // HAVE_SNAPPY
typedef capnp::UseScratch ReusableResources;
typedef capnp::NoScratch SingleUseResources;
template <typename TestCase, typename ReuseStrategy, typename Compression>
struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
};
} // namespace capnp
} // namespace benchmark
} // namespace capnp