| // Copyright (c) 2017 Cloudflare, Inc. and contributors |
| // Licensed under the MIT License: |
| // |
| // Permission is hereby granted, free of charge, to any person obtaining a copy |
| // of this software and associated documentation files (the "Software"), to deal |
| // in the Software without restriction, including without limitation the rights |
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| // copies of the Software, and to permit persons to whom the Software is |
| // furnished to do so, subject to the following conditions: |
| // |
| // The above copyright notice and this permission notice shall be included in |
| // all copies or substantial portions of the Software. |
| // |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| // THE SOFTWARE. |
| |
| #if KJ_HAS_ZLIB |
| |
| #include "gzip.h" |
| #include <kj/debug.h> |
| |
| namespace kj { |
| |
| namespace _ { // private |
| |
| GzipOutputContext::GzipOutputContext(kj::Maybe<int> compressionLevel) { |
| int initResult; |
| |
| KJ_IF_MAYBE(level, compressionLevel) { |
| compressing = true; |
| initResult = |
| deflateInit2(&ctx, *level, Z_DEFLATED, |
| 15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip. |
| 8, // memLevel = 8 (the default) |
| Z_DEFAULT_STRATEGY); |
| } else { |
| compressing = false; |
| initResult = inflateInit2(&ctx, 15 + 16); |
| } |
| |
| if (initResult != Z_OK) { |
| fail(initResult); |
| } |
| } |
| |
| GzipOutputContext::~GzipOutputContext() noexcept(false) { |
| compressing ? deflateEnd(&ctx) : inflateEnd(&ctx); |
| } |
| |
| void GzipOutputContext::setInput(const void* in, size_t size) { |
| ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); |
| ctx.avail_in = size; |
| } |
| |
| kj::Tuple<bool, kj::ArrayPtr<const byte>> GzipOutputContext::pumpOnce(int flush) { |
| ctx.next_out = buffer; |
| ctx.avail_out = sizeof(buffer); |
| |
| auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush); |
| if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) { |
| fail(result); |
| } |
| |
| // - Z_STREAM_END means we have finished the stream successfully. |
| // - Z_BUF_ERROR means we didn't have any more input to process |
| // (but still have to make a call to write to potentially flush data). |
| return kj::tuple(result == Z_OK, kj::arrayPtr(buffer, sizeof(buffer) - ctx.avail_out)); |
| } |
| |
| void GzipOutputContext::fail(int result) { |
| auto header = compressing ? "gzip compression failed" : "gzip decompression failed"; |
| if (ctx.msg == nullptr) { |
| KJ_FAIL_REQUIRE(header, result); |
| } else { |
| KJ_FAIL_REQUIRE(header, ctx.msg); |
| } |
| } |
| |
| } // namespace _ (private) |
| |
| GzipInputStream::GzipInputStream(InputStream& inner) |
| : inner(inner) { |
| // windowBits = 15 (maximum) + magic value 16 to ask for gzip. |
| KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); |
| } |
| |
| GzipInputStream::~GzipInputStream() noexcept(false) { |
| inflateEnd(&ctx); |
| } |
| |
| size_t GzipInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) { |
| if (maxBytes == 0) return size_t(0); |
| |
| return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0); |
| } |
| |
| size_t GzipInputStream::readImpl( |
| byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { |
| if (ctx.avail_in == 0) { |
| size_t amount = inner.tryRead(buffer, 1, sizeof(buffer)); |
| if (amount == 0) { |
| if (!atValidEndpoint) { |
| KJ_FAIL_REQUIRE("gzip compressed stream ended prematurely"); |
| } |
| return alreadyRead; |
| } else { |
| ctx.next_in = buffer; |
| ctx.avail_in = amount; |
| } |
| } |
| |
| ctx.next_out = reinterpret_cast<byte*>(out); |
| ctx.avail_out = maxBytes; |
| |
| auto inflateResult = inflate(&ctx, Z_NO_FLUSH); |
| atValidEndpoint = inflateResult == Z_STREAM_END; |
| if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) { |
| if (atValidEndpoint && ctx.avail_in > 0) { |
| // There's more data available. Assume start of new content. |
| KJ_ASSERT(inflateReset(&ctx) == Z_OK); |
| } |
| |
| size_t n = maxBytes - ctx.avail_out; |
| if (n >= minBytes) { |
| return n + alreadyRead; |
| } else { |
| return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n); |
| } |
| } else { |
| if (ctx.msg == nullptr) { |
| KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult); |
| } else { |
| KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg); |
| } |
| } |
| } |
| |
| // ======================================================================================= |
| |
| GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel) |
| : inner(inner), ctx(compressionLevel) {} |
| |
| GzipOutputStream::GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS)) |
| : inner(inner), ctx(nullptr) {} |
| |
| GzipOutputStream::~GzipOutputStream() noexcept(false) { |
| pump(Z_FINISH); |
| } |
| |
| void GzipOutputStream::write(const void* in, size_t size) { |
| ctx.setInput(in, size); |
| pump(Z_NO_FLUSH); |
| } |
| |
| void GzipOutputStream::pump(int flush) { |
| bool ok; |
| do { |
| auto result = ctx.pumpOnce(flush); |
| ok = get<0>(result); |
| auto chunk = get<1>(result); |
| if (chunk.size() > 0) { |
| inner.write(chunk.begin(), chunk.size()); |
| } |
| } while (ok); |
| } |
| |
| // ======================================================================================= |
| |
| GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner) |
| : inner(inner) { |
| // windowBits = 15 (maximum) + magic value 16 to ask for gzip. |
| KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); |
| } |
| |
| GzipAsyncInputStream::~GzipAsyncInputStream() noexcept(false) { |
| inflateEnd(&ctx); |
| } |
| |
| Promise<size_t> GzipAsyncInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) { |
| if (maxBytes == 0) return size_t(0); |
| |
| return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0); |
| } |
| |
| Promise<size_t> GzipAsyncInputStream::readImpl( |
| byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { |
| if (ctx.avail_in == 0) { |
| return inner.tryRead(buffer, 1, sizeof(buffer)) |
| .then([this,out,minBytes,maxBytes,alreadyRead](size_t amount) -> Promise<size_t> { |
| if (amount == 0) { |
| if (!atValidEndpoint) { |
| return KJ_EXCEPTION(DISCONNECTED, "gzip compressed stream ended prematurely"); |
| } |
| return alreadyRead; |
| } else { |
| ctx.next_in = buffer; |
| ctx.avail_in = amount; |
| return readImpl(out, minBytes, maxBytes, alreadyRead); |
| } |
| }); |
| } |
| |
| ctx.next_out = reinterpret_cast<byte*>(out); |
| ctx.avail_out = maxBytes; |
| |
| auto inflateResult = inflate(&ctx, Z_NO_FLUSH); |
| atValidEndpoint = inflateResult == Z_STREAM_END; |
| if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) { |
| if (atValidEndpoint && ctx.avail_in > 0) { |
| // There's more data available. Assume start of new content. |
| KJ_ASSERT(inflateReset(&ctx) == Z_OK); |
| } |
| |
| size_t n = maxBytes - ctx.avail_out; |
| if (n >= minBytes) { |
| return n + alreadyRead; |
| } else { |
| return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n); |
| } |
| } else { |
| if (ctx.msg == nullptr) { |
| KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult); |
| } else { |
| KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg); |
| } |
| } |
| } |
| |
| // ======================================================================================= |
| |
| GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel) |
| : inner(inner), ctx(compressionLevel) {} |
| |
| GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS)) |
| : inner(inner), ctx(nullptr) {} |
| |
| Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) { |
| ctx.setInput(in, size); |
| return pump(Z_NO_FLUSH); |
| } |
| |
| Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { |
| if (pieces.size() == 0) return kj::READY_NOW; |
| return write(pieces[0].begin(), pieces[0].size()) |
| .then([this,pieces]() { |
| return write(pieces.slice(1, pieces.size())); |
| }); |
| } |
| |
| kj::Promise<void> GzipAsyncOutputStream::pump(int flush) { |
| auto result = ctx.pumpOnce(flush); |
| auto ok = get<0>(result); |
| auto chunk = get<1>(result); |
| |
| if (chunk.size() == 0) { |
| if (ok) { |
| return pump(flush); |
| } else { |
| return kj::READY_NOW; |
| } |
| } else { |
| auto promise = inner.write(chunk.begin(), chunk.size()); |
| if (ok) { |
| promise = promise.then([this, flush]() { return pump(flush); }); |
| } |
| return promise; |
| } |
| } |
| |
| } // namespace kj |
| |
| #endif // KJ_HAS_ZLIB |