| // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
| // Licensed under the MIT License: |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files (the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| #pragma once |
| |
| #if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS) |
| #pragma GCC system_header |
| #endif |
| |
| #include "common.h" |
| #include <capnp/serialize.h> |
| #include <capnp/serialize-packed.h> |
| #include <kj/debug.h> |
| #if HAVE_SNAPPY |
| #include <capnp/serialize-snappy.h> |
| #endif // HAVE_SNAPPY |
| #include <thread> |
| |
| namespace capnp { |
| namespace benchmark { |
| namespace capnp { |
| |
| class CountingOutputStream: public kj::FdOutputStream { |
| public: |
| CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {} |
| |
| uint64_t throughput; |
| |
| void write(const void* buffer, size_t size) override { |
| FdOutputStream::write(buffer, size); |
| throughput += size; |
| } |
| |
| void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { |
| FdOutputStream::write(pieces); |
| for (auto& piece: pieces) { |
| throughput += piece.size(); |
| } |
| } |
| }; |
| |
| // ======================================================================================= |
| |
| struct Uncompressed { |
| typedef kj::FdInputStream& BufferedInput; |
| typedef InputStreamMessageReader MessageReader; |
| |
| class ArrayMessageReader: public FlatArrayMessageReader { |
| public: |
| ArrayMessageReader(kj::ArrayPtr<const byte> array, |
| ReaderOptions options = ReaderOptions(), |
| kj::ArrayPtr<word> scratchSpace = nullptr) |
| : FlatArrayMessageReader(kj::arrayPtr( |
| reinterpret_cast<const word*>(array.begin()), |
| reinterpret_cast<const word*>(array.end())), options) {} |
| }; |
| |
| static inline void write(kj::OutputStream& output, MessageBuilder& builder) { |
| writeMessage(output, builder); |
| } |
| }; |
| |
| struct Packed { |
| typedef kj::BufferedInputStreamWrapper BufferedInput; |
| typedef PackedMessageReader MessageReader; |
| |
| class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader { |
| public: |
| ArrayMessageReader(kj::ArrayPtr<const byte> array, |
| ReaderOptions options = ReaderOptions(), |
| kj::ArrayPtr<word> scratchSpace = nullptr) |
| : ArrayInputStream(array), |
| PackedMessageReader(*this, options, scratchSpace) {} |
| }; |
| |
| static inline void write(kj::OutputStream& output, MessageBuilder& builder) { |
| writePackedMessage(output, builder); |
| } |
| |
| static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) { |
| writePackedMessage(output, builder); |
| } |
| }; |
| |
| #if HAVE_SNAPPY |
| static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE]; |
| static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE]; |
| static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE]; |
| |
| struct SnappyCompressed { |
| typedef BufferedInputStreamWrapper BufferedInput; |
| typedef SnappyPackedMessageReader MessageReader; |
| |
| class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader { |
| public: |
| ArrayMessageReader(kj::ArrayPtr<const byte> array, |
| ReaderOptions options = ReaderOptions(), |
| kj::ArrayPtr<word> scratchSpace = nullptr) |
| : ArrayInputStream(array), |
| SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace, |
| kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {} |
| }; |
| |
| static inline void write(OutputStream& output, MessageBuilder& builder) { |
| writeSnappyPackedMessage(output, builder, |
| kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE), |
| kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE)); |
| } |
| }; |
| #endif // HAVE_SNAPPY |
| |
| // ======================================================================================= |
| |
| struct NoScratch { |
| struct ScratchSpace {}; |
| |
| template <typename Compression> |
| class MessageReader: public Compression::MessageReader { |
| public: |
| inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch) |
| : Compression::MessageReader(input) {} |
| }; |
| |
| template <typename Compression> |
| class ArrayMessageReader: public Compression::ArrayMessageReader { |
| public: |
| inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch) |
| : Compression::ArrayMessageReader(input) {} |
| }; |
| |
| class MessageBuilder: public MallocMessageBuilder { |
| public: |
| inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {} |
| }; |
| |
| class ObjectSizeCounter { |
| public: |
| ObjectSizeCounter(uint64_t iters): counter(0) {} |
| |
| template <typename RequestBuilder, typename ResponseBuilder> |
| void add(RequestBuilder& request, ResponseBuilder& response) { |
| for (auto segment: request.getSegmentsForOutput()) { |
| counter += segment.size() * sizeof(word); |
| } |
| for (auto segment: response.getSegmentsForOutput()) { |
| counter += segment.size() * sizeof(word); |
| } |
| } |
| |
| uint64_t get() { return counter; } |
| |
| private: |
| uint64_t counter; |
| }; |
| }; |
| |
| constexpr size_t SCRATCH_SIZE = 128 * 1024; |
| word scratchSpace[6 * SCRATCH_SIZE]; |
| int scratchCounter = 0; |
| |
| struct UseScratch { |
| struct ScratchSpace { |
| word* words; |
| |
| ScratchSpace() { |
| KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once."); |
| words = scratchSpace + scratchCounter++ * SCRATCH_SIZE; |
| } |
| ~ScratchSpace() noexcept { |
| --scratchCounter; |
| } |
| }; |
| |
| template <typename Compression> |
| class MessageReader: public Compression::MessageReader { |
| public: |
| inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch) |
| : Compression::MessageReader( |
| input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} |
| }; |
| |
| template <typename Compression> |
| class ArrayMessageReader: public Compression::ArrayMessageReader { |
| public: |
| inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch) |
| : Compression::ArrayMessageReader( |
| input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} |
| }; |
| |
| class MessageBuilder: public MallocMessageBuilder { |
| public: |
| inline MessageBuilder(ScratchSpace& scratch) |
| : MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {} |
| }; |
| |
| class ObjectSizeCounter { |
| public: |
| ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {} |
| |
| template <typename RequestBuilder, typename ResponseBuilder> |
| void add(RequestBuilder& request, ResponseBuilder& response) { |
| size_t counter = 0; |
| for (auto segment: request.getSegmentsForOutput()) { |
| counter += segment.size() * sizeof(word); |
| } |
| for (auto segment: response.getSegmentsForOutput()) { |
| counter += segment.size() * sizeof(word); |
| } |
| maxSize = std::max(counter, maxSize); |
| } |
| |
| uint64_t get() { return iters * maxSize; } |
| |
| private: |
| uint64_t iters; |
| size_t maxSize; |
| }; |
| }; |
| |
| // ======================================================================================= |
| |
| template <typename TestCase, typename ReuseStrategy, typename Compression> |
| struct BenchmarkMethods { |
| static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) { |
| kj::FdInputStream inputStream(inputFd); |
| typename Compression::BufferedInput bufferedInput(inputStream); |
| |
| CountingOutputStream output(outputFd); |
| typename ReuseStrategy::ScratchSpace builderScratch; |
| typename ReuseStrategy::ScratchSpace readerScratch; |
| |
| for (; iters > 0; --iters) { |
| typename TestCase::Expectation expected; |
| { |
| typename ReuseStrategy::MessageBuilder builder(builderScratch); |
| expected = TestCase::setupRequest( |
| builder.template initRoot<typename TestCase::Request>()); |
| Compression::write(output, builder); |
| } |
| |
| { |
| typename ReuseStrategy::template MessageReader<Compression> reader( |
| bufferedInput, readerScratch); |
| if (!TestCase::checkResponse( |
| reader.template getRoot<typename TestCase::Response>(), expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| } |
| } |
| |
| return output.throughput; |
| } |
| |
| static uint64_t asyncClientSender( |
| int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, |
| uint64_t iters) { |
| CountingOutputStream output(outputFd); |
| typename ReuseStrategy::ScratchSpace scratch; |
| |
| for (; iters > 0; --iters) { |
| typename ReuseStrategy::MessageBuilder builder(scratch); |
| expectations->post(TestCase::setupRequest( |
| builder.template initRoot<typename TestCase::Request>())); |
| Compression::write(output, builder); |
| } |
| |
| return output.throughput; |
| } |
| |
| static void asyncClientReceiver( |
| int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, |
| uint64_t iters) { |
| kj::FdInputStream inputStream(inputFd); |
| typename Compression::BufferedInput bufferedInput(inputStream); |
| |
| typename ReuseStrategy::ScratchSpace scratch; |
| |
| for (; iters > 0; --iters) { |
| typename TestCase::Expectation expected = expectations->next(); |
| typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch); |
| if (!TestCase::checkResponse( |
| reader.template getRoot<typename TestCase::Response>(), expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| } |
| } |
| |
| static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) { |
| ProducerConsumerQueue<typename TestCase::Expectation> expectations; |
| std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters); |
| uint64_t throughput = asyncClientSender(outputFd, &expectations, iters); |
| receiverThread.join(); |
| return throughput; |
| } |
| |
| static uint64_t server(int inputFd, int outputFd, uint64_t iters) { |
| kj::FdInputStream inputStream(inputFd); |
| typename Compression::BufferedInput bufferedInput(inputStream); |
| |
| CountingOutputStream output(outputFd); |
| typename ReuseStrategy::ScratchSpace builderScratch; |
| typename ReuseStrategy::ScratchSpace readerScratch; |
| |
| for (; iters > 0; --iters) { |
| typename ReuseStrategy::MessageBuilder builder(builderScratch); |
| typename ReuseStrategy::template MessageReader<Compression> reader( |
| bufferedInput, readerScratch); |
| TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(), |
| builder.template initRoot<typename TestCase::Response>()); |
| Compression::write(output, builder); |
| } |
| |
| return output.throughput; |
| } |
| |
| static uint64_t passByObject(uint64_t iters, bool countObjectSize) { |
| typename ReuseStrategy::ScratchSpace requestScratch; |
| typename ReuseStrategy::ScratchSpace responseScratch; |
| |
| typename ReuseStrategy::ObjectSizeCounter counter(iters); |
| |
| for (; iters > 0; --iters) { |
| typename ReuseStrategy::MessageBuilder requestMessage(requestScratch); |
| auto request = requestMessage.template initRoot<typename TestCase::Request>(); |
| typename TestCase::Expectation expected = TestCase::setupRequest(request); |
| |
| typename ReuseStrategy::MessageBuilder responseMessage(responseScratch); |
| auto response = responseMessage.template initRoot<typename TestCase::Response>(); |
| TestCase::handleRequest(request.asReader(), response); |
| |
| if (!TestCase::checkResponse(response.asReader(), expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| |
| if (countObjectSize) { |
| counter.add(requestMessage, responseMessage); |
| } |
| } |
| |
| return counter.get(); |
| } |
| |
| static uint64_t passByBytes(uint64_t iters) { |
| uint64_t throughput = 0; |
| typename ReuseStrategy::ScratchSpace clientRequestScratch; |
| UseScratch::ScratchSpace requestBytesScratch; |
| typename ReuseStrategy::ScratchSpace serverRequestScratch; |
| typename ReuseStrategy::ScratchSpace serverResponseScratch; |
| UseScratch::ScratchSpace responseBytesScratch; |
| typename ReuseStrategy::ScratchSpace clientResponseScratch; |
| |
| for (; iters > 0; --iters) { |
| typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch); |
| typename TestCase::Expectation expected = TestCase::setupRequest( |
| requestBuilder.template initRoot<typename TestCase::Request>()); |
| |
| kj::ArrayOutputStream requestOutput(kj::arrayPtr( |
| reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word))); |
| Compression::write(requestOutput, requestBuilder); |
| throughput += requestOutput.getArray().size(); |
| typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader( |
| requestOutput.getArray(), serverRequestScratch); |
| |
| typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch); |
| TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(), |
| responseBuilder.template initRoot<typename TestCase::Response>()); |
| |
| kj::ArrayOutputStream responseOutput( |
| kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words), |
| SCRATCH_SIZE * sizeof(word))); |
| Compression::write(responseOutput, responseBuilder); |
| throughput += responseOutput.getArray().size(); |
| typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader( |
| responseOutput.getArray(), clientResponseScratch); |
| |
| if (!TestCase::checkResponse( |
| responseReader.template getRoot<typename TestCase::Response>(), expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| } |
| |
| return throughput; |
| } |
| }; |
| |
| struct BenchmarkTypes { |
| typedef capnp::Uncompressed Uncompressed; |
| typedef capnp::Packed Packed; |
| #if HAVE_SNAPPY |
| typedef capnp::SnappyCompressed SnappyCompressed; |
| #endif // HAVE_SNAPPY |
| |
| typedef capnp::UseScratch ReusableResources; |
| typedef capnp::NoScratch SingleUseResources; |
| |
| template <typename TestCase, typename ReuseStrategy, typename Compression> |
| struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {}; |
| }; |
| |
| } // namespace capnp |
| } // namespace benchmark |
| } // namespace capnp |