blob: 59d12e58639fb65591fa82417eccf3f85487a008 [file] [log] [blame]
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#if _WIN32
#include "win32-api-version.h"
#endif
#include "io.h"
#include "debug.h"
#include "miniposix.h"
#include <algorithm>
#include <errno.h>
#include "vector.h"
#if _WIN32
#include <windows.h>
#include "windows-sanity.h"
#else
#include <sys/uio.h>
#endif
namespace kj {
InputStream::~InputStream() noexcept(false) {}
OutputStream::~OutputStream() noexcept(false) {}
BufferedInputStream::~BufferedInputStream() noexcept(false) {}
BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
size_t n = tryRead(buffer, minBytes, maxBytes);
KJ_REQUIRE(n >= minBytes, "Premature EOF") {
// Pretend we read zeros from the input.
memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
return minBytes;
}
return n;
}
void InputStream::skip(size_t bytes) {
char scratch[8192];
while (bytes > 0) {
size_t amount = std::min(bytes, sizeof(scratch));
read(scratch, amount);
bytes -= amount;
}
}
namespace {
Array<byte> readAll(InputStream& input, uint64_t limit, bool nulTerminate) {
Vector<Array<byte>> parts;
constexpr size_t BLOCK_SIZE = 4096;
for (;;) {
KJ_REQUIRE(limit > 0, "Reached limit before EOF.");
auto part = heapArray<byte>(kj::min(BLOCK_SIZE, limit));
size_t n = input.tryRead(part.begin(), part.size(), part.size());
limit -= n;
if (n < part.size()) {
auto result = heapArray<byte>(parts.size() * BLOCK_SIZE + n + nulTerminate);
byte* pos = result.begin();
for (auto& p: parts) {
memcpy(pos, p.begin(), BLOCK_SIZE);
pos += BLOCK_SIZE;
}
memcpy(pos, part.begin(), n);
pos += n;
if (nulTerminate) *pos++ = '\0';
KJ_ASSERT(pos == result.end());
return result;
} else {
parts.add(kj::mv(part));
}
}
}
} // namespace
String InputStream::readAllText(uint64_t limit) {
return String(readAll(*this, limit, true).releaseAsChars());
}
Array<byte> InputStream::readAllBytes(uint64_t limit) {
return readAll(*this, limit, false);
}
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
for (auto piece: pieces) {
write(piece.begin(), piece.size());
}
}
ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
auto result = tryGetReadBuffer();
KJ_REQUIRE(result.size() > 0, "Premature EOF");
return result;
}
// =======================================================================================
BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
: inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
buffer(buffer == nullptr ? ownedBuffer : buffer) {}
BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
if (bufferAvailable.size() == 0) {
size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
bufferAvailable = buffer.slice(0, n);
}
return bufferAvailable;
}
size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
if (minBytes <= bufferAvailable.size()) {
// Serve from current buffer.
size_t n = std::min(bufferAvailable.size(), maxBytes);
memcpy(dst, bufferAvailable.begin(), n);
bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
return n;
} else {
// Copy current available into destination.
memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
size_t fromFirstBuffer = bufferAvailable.size();
dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
minBytes -= fromFirstBuffer;
maxBytes -= fromFirstBuffer;
if (maxBytes <= buffer.size()) {
// Read the next buffer-full.
size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
size_t fromSecondBuffer = std::min(n, maxBytes);
memcpy(dst, buffer.begin(), fromSecondBuffer);
bufferAvailable = buffer.slice(fromSecondBuffer, n);
return fromFirstBuffer + fromSecondBuffer;
} else {
// Forward large read to the underlying stream.
bufferAvailable = nullptr;
return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
}
}
}
void BufferedInputStreamWrapper::skip(size_t bytes) {
if (bytes <= bufferAvailable.size()) {
bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
} else {
bytes -= bufferAvailable.size();
if (bytes <= buffer.size()) {
// Read the next buffer-full.
size_t n = inner.read(buffer.begin(), bytes, buffer.size());
bufferAvailable = buffer.slice(bytes, n);
} else {
// Forward large skip to the underlying stream.
bufferAvailable = nullptr;
inner.skip(bytes);
}
}
}
// -------------------------------------------------------------------
BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
: inner(inner),
ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
buffer(buffer == nullptr ? ownedBuffer : buffer),
bufferPos(this->buffer.begin()) {}
BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
unwindDetector.catchExceptionsIfUnwinding([&]() {
flush();
});
}
void BufferedOutputStreamWrapper::flush() {
if (bufferPos > buffer.begin()) {
inner.write(buffer.begin(), bufferPos - buffer.begin());
bufferPos = buffer.begin();
}
}
ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
return arrayPtr(bufferPos, buffer.end());
}
void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
if (src == bufferPos) {
// Oh goody, the caller wrote directly into our buffer.
bufferPos += size;
} else {
size_t available = buffer.end() - bufferPos;
if (size <= available) {
memcpy(bufferPos, src, size);
bufferPos += size;
} else if (size <= buffer.size()) {
// Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
memcpy(bufferPos, src, available);
inner.write(buffer.begin(), buffer.size());
size -= available;
src = reinterpret_cast<const byte*>(src) + available;
memcpy(buffer.begin(), src, size);
bufferPos = buffer.begin() + size;
} else {
// Writing so much data that we might as well write directly to avoid a copy.
inner.write(buffer.begin(), bufferPos - buffer.begin());
bufferPos = buffer.begin();
inner.write(src, size);
}
}
}
// =======================================================================================
ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
ArrayInputStream::~ArrayInputStream() noexcept(false) {}
ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
return array;
}
size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
size_t n = std::min(maxBytes, array.size());
memcpy(dst, array.begin(), n);
array = array.slice(n, array.size());
return n;
}
void ArrayInputStream::skip(size_t bytes) {
KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
bytes = array.size();
break;
}
array = array.slice(bytes, array.size());
}
// -------------------------------------------------------------------
ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
return arrayPtr(fillPos, array.end());
}
void ArrayOutputStream::write(const void* src, size_t size) {
if (src == fillPos && fillPos != array.end()) {
// Oh goody, the caller wrote directly into our buffer.
KJ_REQUIRE(size <= array.end() - fillPos, size, fillPos, array.end() - fillPos);
fillPos += size;
} else {
KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
"ArrayOutputStream's backing array was not large enough for the data written.");
memcpy(fillPos, src, size);
fillPos += size;
}
}
// -------------------------------------------------------------------
VectorOutputStream::VectorOutputStream(size_t initialCapacity)
: vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {}
VectorOutputStream::~VectorOutputStream() noexcept(false) {}
ArrayPtr<byte> VectorOutputStream::getWriteBuffer() {
// Grow if needed.
if (fillPos == vector.end()) {
grow(vector.size() + 1);
}
return arrayPtr(fillPos, vector.end());
}
void VectorOutputStream::write(const void* src, size_t size) {
if (src == fillPos && fillPos != vector.end()) {
// Oh goody, the caller wrote directly into our buffer.
KJ_REQUIRE(size <= vector.end() - fillPos, size, fillPos, vector.end() - fillPos);
fillPos += size;
} else {
if (vector.end() - fillPos < size) {
grow(fillPos - vector.begin() + size);
}
memcpy(fillPos, src, size);
fillPos += size;
}
}
void VectorOutputStream::grow(size_t minSize) {
size_t newSize = vector.size() * 2;
while (newSize < minSize) newSize *= 2;
auto newVector = heapArray<byte>(newSize);
memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin());
fillPos = fillPos - vector.begin() + newVector.begin();
vector = kj::mv(newVector);
}
// =======================================================================================
AutoCloseFd::~AutoCloseFd() noexcept(false) {
if (fd >= 0) {
// Don't use SYSCALL() here because close() should not be repeated on EINTR.
if (miniposix::close(fd) < 0) {
KJ_FAIL_SYSCALL("close", errno, fd) {
// This ensures we don't throw an exception if unwinding.
break;
}
}
}
}
FdInputStream::~FdInputStream() noexcept(false) {}
size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
byte* pos = reinterpret_cast<byte*>(buffer);
byte* min = pos + minBytes;
byte* max = pos + maxBytes;
while (pos < min) {
miniposix::ssize_t n;
KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
if (n == 0) {
break;
}
pos += n;
}
return pos - reinterpret_cast<byte*>(buffer);
}
FdOutputStream::~FdOutputStream() noexcept(false) {}
void FdOutputStream::write(const void* buffer, size_t size) {
const char* pos = reinterpret_cast<const char*>(buffer);
while (size > 0) {
miniposix::ssize_t n;
KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
KJ_ASSERT(n > 0, "write() returned zero.");
pos += n;
size -= n;
}
}
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
#if _WIN32
// Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
// restriction that each segment must be page-aligned. So, fall back to the default implementation
OutputStream::write(pieces);
#else
const size_t iovmax = miniposix::iovMax();
while (pieces.size() > iovmax) {
write(pieces.slice(0, iovmax));
pieces = pieces.slice(iovmax, pieces.size());
}
KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
for (uint i = 0; i < pieces.size(); i++) {
// writev() interface is not const-correct. :(
iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
iov[i].iov_len = pieces[i].size();
}
struct iovec* current = iov.begin();
// Advance past any leading empty buffers so that a write full of only empty buffers does not
// cause a syscall at all.
while (current < iov.end() && current->iov_len == 0) {
++current;
}
while (current < iov.end()) {
// Issue the write.
ssize_t n = 0;
KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
KJ_ASSERT(n > 0, "writev() returned zero.");
// Advance past all buffers that were fully-written.
while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
n -= current->iov_len;
++current;
}
// If we only partially-wrote one of the buffers, adjust the pointer and size to include only
// the unwritten part.
if (n > 0) {
current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
current->iov_len -= n;
}
}
#endif
}
// =======================================================================================
#if _WIN32
AutoCloseHandle::~AutoCloseHandle() noexcept(false) {
if (handle != (void*)-1) {
KJ_WIN32(CloseHandle(handle));
}
}
HandleInputStream::~HandleInputStream() noexcept(false) {}
size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
byte* pos = reinterpret_cast<byte*>(buffer);
byte* min = pos + minBytes;
byte* max = pos + maxBytes;
while (pos < min) {
DWORD n;
KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr));
if (n == 0) {
break;
}
pos += n;
}
return pos - reinterpret_cast<byte*>(buffer);
}
HandleOutputStream::~HandleOutputStream() noexcept(false) {}
void HandleOutputStream::write(const void* buffer, size_t size) {
const char* pos = reinterpret_cast<const char*>(buffer);
while (size > 0) {
DWORD n;
KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr));
KJ_ASSERT(n > 0, "write() returned zero.");
pos += n;
size -= n;
}
}
#endif // _WIN32
} // namespace kj