blob: c4ea24427a4e737ab830d077f5f31402e5876b79 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "common.h"
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <thread>
#if HAVE_SNAPPY
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
#endif // HAVE_SNAPPY
namespace capnp {
namespace benchmark {
namespace protobuf {
// =======================================================================================
struct SingleUseMessages {
template <typename MessageType>
struct Message {
struct Reusable {};
struct SingleUse: public MessageType {
inline SingleUse(Reusable&) {}
};
};
struct ReusableString {};
struct SingleUseString: std::string {
inline SingleUseString(ReusableString&) {}
};
template <typename MessageType>
static inline void doneWith(MessageType& message) {
// Don't clear -- single-use.
}
};
struct ReusableMessages {
template <typename MessageType>
struct Message {
struct Reusable: public MessageType {};
typedef MessageType& SingleUse;
};
typedef std::string ReusableString;
typedef std::string& SingleUseString;
template <typename MessageType>
static inline void doneWith(MessageType& message) {
message.Clear();
}
};
// =======================================================================================
// The protobuf Java library defines a format for writing multiple protobufs to a stream, in which
// each message is prefixed by a varint size. This was never added to the C++ library. It's easy
// to do naively, but tricky to implement without accidentally losing various optimizations. These
// two functions should be optimal.
struct Uncompressed {
typedef google::protobuf::io::FileInputStream InputStream;
typedef google::protobuf::io::FileOutputStream OutputStream;
static uint64_t write(const google::protobuf::MessageLite& message,
google::protobuf::io::FileOutputStream* rawOutput) {
google::protobuf::io::CodedOutputStream output(rawOutput);
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL) {
message.SerializeWithCachedSizesToArray(buffer);
} else {
message.SerializeWithCachedSizes(&output);
if (output.HadError()) {
throw OsException(rawOutput->GetErrno());
}
}
return size;
}
static void read(google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
google::protobuf::io::CodedInputStream input(rawInput);
uint32_t size;
GOOGLE_CHECK(input.ReadVarint32(&size));
auto limit = input.PushLimit(size);
GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) &&
input.ConsumedEntireMessage());
input.PopLimit(limit);
}
static void flush(google::protobuf::io::FileOutputStream* output) {
if (!output->Flush()) throw OsException(output->GetErrno());
}
};
// =======================================================================================
// The Snappy interface is really obnoxious. I gave up here and am just reading/writing flat
// arrays in some static scratch space. This probably gives protobufs an edge that it doesn't
// deserve.
#if HAVE_SNAPPY
static char scratch[1 << 20];
static char scratch2[1 << 20];
struct SnappyCompressed {
typedef int InputStream;
typedef int OutputStream;
static uint64_t write(const google::protobuf::MessageLite& message, int* output) {
size_t size = message.ByteSize();
GOOGLE_CHECK_LE(size, sizeof(scratch));
message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch));
size_t compressedSize = 0;
snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize);
uint32_t tag = compressedSize;
memcpy(scratch2, &tag, sizeof(tag));
writeAll(*output, scratch2, compressedSize + sizeof(tag));
return compressedSize + sizeof(tag);
}
static void read(int* input, google::protobuf::MessageLite* message) {
uint32_t size;
readAll(*input, &size, sizeof(size));
readAll(*input, scratch, size);
size_t uncompressedSize;
GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize));
GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2));
GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize));
}
static void flush(OutputStream*) {}
};
#endif // HAVE_SNAPPY
// =======================================================================================
#define REUSABLE(type) \
typename ReuseStrategy::template Message<typename TestCase::type>::Reusable
#define SINGLE_USE(type) \
typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse
template <typename TestCase, typename ReuseStrategy, typename Compression>
struct BenchmarkMethods {
static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
uint64_t throughput = 0;
typename Compression::OutputStream output(outputFd);
typename Compression::InputStream input(inputFd);
REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest);
typename TestCase::Expectation expected = TestCase::setupRequest(&request);
throughput += Compression::write(request, &output);
Compression::flush(&output);
ReuseStrategy::doneWith(request);
SINGLE_USE(Response) response(reusableResponse);
Compression::read(&input, &response);
if (!TestCase::checkResponse(response, expected)) {
throw std::logic_error("Incorrect response.");
}
ReuseStrategy::doneWith(response);
}
return throughput;
}
static uint64_t asyncClientSender(
int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) {
uint64_t throughput = 0;
typename Compression::OutputStream output(outputFd);
REUSABLE(Request) reusableRequest;
for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest);
expectations->post(TestCase::setupRequest(&request));
throughput += Compression::write(request, &output);
Compression::flush(&output);
ReuseStrategy::doneWith(request);
}
return throughput;
}
static void asyncClientReceiver(
int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) {
typename Compression::InputStream input(inputFd);
REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) {
typename TestCase::Expectation expected = expectations->next();
SINGLE_USE(Response) response(reusableResponse);
Compression::read(&input, &response);
if (!TestCase::checkResponse(response, expected)) {
throw std::logic_error("Incorrect response.");
}
ReuseStrategy::doneWith(response);
}
}
static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
ProducerConsumerQueue<typename TestCase::Expectation> expectations;
std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
receiverThread.join();
return throughput;
}
static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
uint64_t throughput = 0;
typename Compression::OutputStream output(outputFd);
typename Compression::InputStream input(inputFd);
REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest);
Compression::read(&input, &request);
SINGLE_USE(Response) response(reusableResponse);
TestCase::handleRequest(request, &response);
ReuseStrategy::doneWith(request);
throughput += Compression::write(response, &output);
Compression::flush(&output);
ReuseStrategy::doneWith(response);
}
return throughput;
}
static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
uint64_t throughput = 0;
REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest);
typename TestCase::Expectation expected = TestCase::setupRequest(&request);
SINGLE_USE(Response) response(reusableResponse);
TestCase::handleRequest(request, &response);
ReuseStrategy::doneWith(request);
if (!TestCase::checkResponse(response, expected)) {
throw std::logic_error("Incorrect response.");
}
ReuseStrategy::doneWith(response);
if (countObjectSize) {
throughput += request.SpaceUsed();
throughput += response.SpaceUsed();
}
}
return throughput;
}
static uint64_t passByBytes(uint64_t iters) {
uint64_t throughput = 0;
REUSABLE(Request) reusableClientRequest;
REUSABLE(Request) reusableServerRequest;
REUSABLE(Response) reusableServerResponse;
REUSABLE(Response) reusableClientResponse;
typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString;
for (; iters > 0; --iters) {
SINGLE_USE(Request) clientRequest(reusableClientRequest);
typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest);
typename ReuseStrategy::SingleUseString requestString(reusableRequestString);
clientRequest.SerializePartialToString(&requestString);
throughput += requestString.size();
ReuseStrategy::doneWith(clientRequest);
SINGLE_USE(Request) serverRequest(reusableServerRequest);
serverRequest.ParsePartialFromString(requestString);
SINGLE_USE(Response) serverResponse(reusableServerResponse);
TestCase::handleRequest(serverRequest, &serverResponse);
ReuseStrategy::doneWith(serverRequest);
typename ReuseStrategy::SingleUseString responseString(reusableResponseString);
serverResponse.SerializePartialToString(&responseString);
throughput += responseString.size();
ReuseStrategy::doneWith(serverResponse);
SINGLE_USE(Response) clientResponse(reusableClientResponse);
clientResponse.ParsePartialFromString(responseString);
if (!TestCase::checkResponse(clientResponse, expected)) {
throw std::logic_error("Incorrect response.");
}
ReuseStrategy::doneWith(clientResponse);
}
return throughput;
}
};
struct BenchmarkTypes {
typedef protobuf::Uncompressed Uncompressed;
typedef protobuf::Uncompressed Packed;
#if HAVE_SNAPPY
typedef protobuf::SnappyCompressed SnappyCompressed;
#endif // HAVE_SNAPPY
typedef protobuf::ReusableMessages ReusableResources;
typedef protobuf::SingleUseMessages SingleUseResources;
template <typename TestCase, typename ReuseStrategy, typename Compression>
struct BenchmarkMethods
: public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
};
} // namespace protobuf
} // namespace benchmark
} // namespace capnp