| // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
| // Licensed under the MIT License: |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files (the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| #include "common.h" |
| #include <google/protobuf/io/zero_copy_stream_impl.h> |
| #include <google/protobuf/io/coded_stream.h> |
| #include <thread> |
| #if HAVE_SNAPPY |
| #include <snappy/snappy.h> |
| #include <snappy/snappy-sinksource.h> |
| #endif // HAVE_SNAPPY |
| |
| namespace capnp { |
| namespace benchmark { |
| namespace protobuf { |
| |
| // ======================================================================================= |
| |
| struct SingleUseMessages { |
| template <typename MessageType> |
| struct Message { |
| struct Reusable {}; |
| struct SingleUse: public MessageType { |
| inline SingleUse(Reusable&) {} |
| }; |
| }; |
| |
| struct ReusableString {}; |
| struct SingleUseString: std::string { |
| inline SingleUseString(ReusableString&) {} |
| }; |
| |
| template <typename MessageType> |
| static inline void doneWith(MessageType& message) { |
| // Don't clear -- single-use. |
| } |
| }; |
| |
| struct ReusableMessages { |
| template <typename MessageType> |
| struct Message { |
| struct Reusable: public MessageType {}; |
| typedef MessageType& SingleUse; |
| }; |
| |
| typedef std::string ReusableString; |
| typedef std::string& SingleUseString; |
| |
| template <typename MessageType> |
| static inline void doneWith(MessageType& message) { |
| message.Clear(); |
| } |
| }; |
| |
| // ======================================================================================= |
| // The protobuf Java library defines a format for writing multiple protobufs to a stream, in which |
| // each message is prefixed by a varint size. This was never added to the C++ library. It's easy |
| // to do naively, but tricky to implement without accidentally losing various optimizations. These |
| // two functions should be optimal. |
| |
| struct Uncompressed { |
| typedef google::protobuf::io::FileInputStream InputStream; |
| typedef google::protobuf::io::FileOutputStream OutputStream; |
| |
| static uint64_t write(const google::protobuf::MessageLite& message, |
| google::protobuf::io::FileOutputStream* rawOutput) { |
| google::protobuf::io::CodedOutputStream output(rawOutput); |
| const int size = message.ByteSize(); |
| output.WriteVarint32(size); |
| uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); |
| if (buffer != NULL) { |
| message.SerializeWithCachedSizesToArray(buffer); |
| } else { |
| message.SerializeWithCachedSizes(&output); |
| if (output.HadError()) { |
| throw OsException(rawOutput->GetErrno()); |
| } |
| } |
| |
| return size; |
| } |
| |
| static void read(google::protobuf::io::ZeroCopyInputStream* rawInput, |
| google::protobuf::MessageLite* message) { |
| google::protobuf::io::CodedInputStream input(rawInput); |
| uint32_t size; |
| GOOGLE_CHECK(input.ReadVarint32(&size)); |
| |
| auto limit = input.PushLimit(size); |
| |
| GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) && |
| input.ConsumedEntireMessage()); |
| |
| input.PopLimit(limit); |
| } |
| |
| static void flush(google::protobuf::io::FileOutputStream* output) { |
| if (!output->Flush()) throw OsException(output->GetErrno()); |
| } |
| }; |
| |
| // ======================================================================================= |
| // The Snappy interface is really obnoxious. I gave up here and am just reading/writing flat |
| // arrays in some static scratch space. This probably gives protobufs an edge that it doesn't |
| // deserve. |
| |
| #if HAVE_SNAPPY |
| |
| static char scratch[1 << 20]; |
| static char scratch2[1 << 20]; |
| |
| struct SnappyCompressed { |
| typedef int InputStream; |
| typedef int OutputStream; |
| |
| static uint64_t write(const google::protobuf::MessageLite& message, int* output) { |
| size_t size = message.ByteSize(); |
| GOOGLE_CHECK_LE(size, sizeof(scratch)); |
| |
| message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch)); |
| |
| size_t compressedSize = 0; |
| snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize); |
| uint32_t tag = compressedSize; |
| memcpy(scratch2, &tag, sizeof(tag)); |
| |
| writeAll(*output, scratch2, compressedSize + sizeof(tag)); |
| return compressedSize + sizeof(tag); |
| } |
| |
| static void read(int* input, google::protobuf::MessageLite* message) { |
| uint32_t size; |
| readAll(*input, &size, sizeof(size)); |
| readAll(*input, scratch, size); |
| |
| size_t uncompressedSize; |
| GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize)); |
| GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2)); |
| |
| GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize)); |
| } |
| |
| static void flush(OutputStream*) {} |
| }; |
| |
| #endif // HAVE_SNAPPY |
| |
| // ======================================================================================= |
| |
| #define REUSABLE(type) \ |
| typename ReuseStrategy::template Message<typename TestCase::type>::Reusable |
| #define SINGLE_USE(type) \ |
| typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse |
| |
| template <typename TestCase, typename ReuseStrategy, typename Compression> |
| struct BenchmarkMethods { |
| static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) { |
| uint64_t throughput = 0; |
| |
| typename Compression::OutputStream output(outputFd); |
| typename Compression::InputStream input(inputFd); |
| |
| REUSABLE(Request) reusableRequest; |
| REUSABLE(Response) reusableResponse; |
| |
| for (; iters > 0; --iters) { |
| SINGLE_USE(Request) request(reusableRequest); |
| typename TestCase::Expectation expected = TestCase::setupRequest(&request); |
| throughput += Compression::write(request, &output); |
| Compression::flush(&output); |
| ReuseStrategy::doneWith(request); |
| |
| SINGLE_USE(Response) response(reusableResponse); |
| Compression::read(&input, &response); |
| if (!TestCase::checkResponse(response, expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| ReuseStrategy::doneWith(response); |
| } |
| |
| return throughput; |
| } |
| |
| static uint64_t asyncClientSender( |
| int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, |
| uint64_t iters) { |
| uint64_t throughput = 0; |
| |
| typename Compression::OutputStream output(outputFd); |
| REUSABLE(Request) reusableRequest; |
| |
| for (; iters > 0; --iters) { |
| SINGLE_USE(Request) request(reusableRequest); |
| expectations->post(TestCase::setupRequest(&request)); |
| throughput += Compression::write(request, &output); |
| Compression::flush(&output); |
| ReuseStrategy::doneWith(request); |
| } |
| |
| return throughput; |
| } |
| |
| static void asyncClientReceiver( |
| int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, |
| uint64_t iters) { |
| typename Compression::InputStream input(inputFd); |
| REUSABLE(Response) reusableResponse; |
| |
| for (; iters > 0; --iters) { |
| typename TestCase::Expectation expected = expectations->next(); |
| SINGLE_USE(Response) response(reusableResponse); |
| Compression::read(&input, &response); |
| if (!TestCase::checkResponse(response, expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| ReuseStrategy::doneWith(response); |
| } |
| } |
| |
| static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) { |
| ProducerConsumerQueue<typename TestCase::Expectation> expectations; |
| std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters); |
| uint64_t throughput = asyncClientSender(outputFd, &expectations, iters); |
| receiverThread.join(); |
| |
| return throughput; |
| } |
| |
| static uint64_t server(int inputFd, int outputFd, uint64_t iters) { |
| uint64_t throughput = 0; |
| |
| typename Compression::OutputStream output(outputFd); |
| typename Compression::InputStream input(inputFd); |
| |
| REUSABLE(Request) reusableRequest; |
| REUSABLE(Response) reusableResponse; |
| |
| for (; iters > 0; --iters) { |
| SINGLE_USE(Request) request(reusableRequest); |
| Compression::read(&input, &request); |
| |
| SINGLE_USE(Response) response(reusableResponse); |
| TestCase::handleRequest(request, &response); |
| ReuseStrategy::doneWith(request); |
| |
| throughput += Compression::write(response, &output); |
| Compression::flush(&output); |
| ReuseStrategy::doneWith(response); |
| } |
| |
| return throughput; |
| } |
| |
| static uint64_t passByObject(uint64_t iters, bool countObjectSize) { |
| uint64_t throughput = 0; |
| |
| REUSABLE(Request) reusableRequest; |
| REUSABLE(Response) reusableResponse; |
| |
| for (; iters > 0; --iters) { |
| SINGLE_USE(Request) request(reusableRequest); |
| typename TestCase::Expectation expected = TestCase::setupRequest(&request); |
| |
| SINGLE_USE(Response) response(reusableResponse); |
| TestCase::handleRequest(request, &response); |
| ReuseStrategy::doneWith(request); |
| if (!TestCase::checkResponse(response, expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| ReuseStrategy::doneWith(response); |
| |
| if (countObjectSize) { |
| throughput += request.SpaceUsed(); |
| throughput += response.SpaceUsed(); |
| } |
| } |
| |
| return throughput; |
| } |
| |
| static uint64_t passByBytes(uint64_t iters) { |
| uint64_t throughput = 0; |
| |
| REUSABLE(Request) reusableClientRequest; |
| REUSABLE(Request) reusableServerRequest; |
| REUSABLE(Response) reusableServerResponse; |
| REUSABLE(Response) reusableClientResponse; |
| typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString; |
| |
| for (; iters > 0; --iters) { |
| SINGLE_USE(Request) clientRequest(reusableClientRequest); |
| typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest); |
| |
| typename ReuseStrategy::SingleUseString requestString(reusableRequestString); |
| clientRequest.SerializePartialToString(&requestString); |
| throughput += requestString.size(); |
| ReuseStrategy::doneWith(clientRequest); |
| |
| SINGLE_USE(Request) serverRequest(reusableServerRequest); |
| serverRequest.ParsePartialFromString(requestString); |
| |
| SINGLE_USE(Response) serverResponse(reusableServerResponse); |
| TestCase::handleRequest(serverRequest, &serverResponse); |
| ReuseStrategy::doneWith(serverRequest); |
| |
| typename ReuseStrategy::SingleUseString responseString(reusableResponseString); |
| serverResponse.SerializePartialToString(&responseString); |
| throughput += responseString.size(); |
| ReuseStrategy::doneWith(serverResponse); |
| |
| SINGLE_USE(Response) clientResponse(reusableClientResponse); |
| clientResponse.ParsePartialFromString(responseString); |
| |
| if (!TestCase::checkResponse(clientResponse, expected)) { |
| throw std::logic_error("Incorrect response."); |
| } |
| ReuseStrategy::doneWith(clientResponse); |
| } |
| |
| return throughput; |
| } |
| }; |
| |
| struct BenchmarkTypes { |
| typedef protobuf::Uncompressed Uncompressed; |
| typedef protobuf::Uncompressed Packed; |
| #if HAVE_SNAPPY |
| typedef protobuf::SnappyCompressed SnappyCompressed; |
| #endif // HAVE_SNAPPY |
| |
| typedef protobuf::ReusableMessages ReusableResources; |
| typedef protobuf::SingleUseMessages SingleUseResources; |
| |
| template <typename TestCase, typename ReuseStrategy, typename Compression> |
| struct BenchmarkMethods |
| : public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {}; |
| }; |
| |
| } // namespace protobuf |
| } // namespace benchmark |
| } // namespace capnp |