blob: ca85feba355674af74b6a33bc33b07a8e0143cd9 [file] [log] [blame]
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "readiness-io.h"
namespace kj {
static size_t copyInto(kj::ArrayPtr<byte> dst, kj::ArrayPtr<const byte>& src) {
size_t n = kj::min(dst.size(), src.size());
memcpy(dst.begin(), src.begin(), n);
src = src.slice(n, src.size());
return n;
}
// =======================================================================================
ReadyInputStreamWrapper::ReadyInputStreamWrapper(AsyncInputStream& input): input(input) {}
ReadyInputStreamWrapper::~ReadyInputStreamWrapper() noexcept(false) {}
kj::Maybe<size_t> ReadyInputStreamWrapper::read(kj::ArrayPtr<byte> dst) {
if (eof || dst.size() == 0) return size_t(0);
if (content.size() == 0) {
// No data available. Try to read more.
if (!isPumping) {
isPumping = true;
pumpTask = kj::evalNow([&]() {
return input.tryRead(buffer, 1, sizeof(buffer)).then([this](size_t n) {
if (n == 0) {
eof = true;
} else {
content = kj::arrayPtr(buffer, n);
}
isPumping = false;
});
}).fork();
}
return nullptr;
}
return copyInto(dst, content);
}
kj::Promise<void> ReadyInputStreamWrapper::whenReady() {
return pumpTask.addBranch();
}
// =======================================================================================
ReadyOutputStreamWrapper::ReadyOutputStreamWrapper(AsyncOutputStream& output): output(output) {}
ReadyOutputStreamWrapper::~ReadyOutputStreamWrapper() noexcept(false) {}
kj::Maybe<size_t> ReadyOutputStreamWrapper::write(kj::ArrayPtr<const byte> data) {
if (data.size() == 0) return size_t(0);
if (filled == sizeof(buffer)) {
// No space.
return nullptr;
}
uint end = start + filled;
size_t result = 0;
if (end < sizeof(buffer)) {
// The filled part of the buffer is somewhere in the middle.
// Copy into space after filled space.
result += copyInto(kj::arrayPtr(buffer + end, buffer + sizeof(buffer)), data);
// Copy into space before filled space.
result += copyInto(kj::arrayPtr(buffer, buffer + start), data);
} else {
// Fill currently loops, so we only have one segment of empty space to copy into.
// Copy into the space between the fill's end and the fill's start.
result += copyInto(kj::arrayPtr(buffer + end % sizeof(buffer), buffer + start), data);
}
filled += result;
if (!isPumping && (!corked || filled == sizeof(buffer))) {
isPumping = true;
pumpTask = kj::evalNow([&]() {
return pump();
}).fork();
}
return result;
}
kj::Promise<void> ReadyOutputStreamWrapper::whenReady() {
return pumpTask.addBranch();
}
ReadyOutputStreamWrapper::Cork ReadyOutputStreamWrapper::cork() {
corked = true;
return Cork(*this);
}
void ReadyOutputStreamWrapper::uncork() {
corked = false;
if (!isPumping && filled > 0) {
isPumping = true;
pumpTask = kj::evalNow([&]() {
return pump();
}).fork();
}
}
kj::Promise<void> ReadyOutputStreamWrapper::pump() {
uint oldFilled = filled;
uint end = start + filled;
kj::Promise<void> promise = nullptr;
if (end <= sizeof(buffer)) {
promise = output.write(buffer + start, filled);
} else {
end = end % sizeof(buffer);
segments[0] = kj::arrayPtr(buffer + start, buffer + sizeof(buffer));
segments[1] = kj::arrayPtr(buffer, buffer + end);
promise = output.write(segments);
}
return promise.then([this,oldFilled,end]() -> kj::Promise<void> {
filled -= oldFilled;
start = end;
if (filled > 0) {
return pump();
} else {
isPumping = false;
// As a small optimization, reset to the start of the buffer when it's empty so we can provide
// the underlying layer just one contiguous chunk of memory instead of two when possible.
start = 0;
return kj::READY_NOW;
}
});
}
} // namespace kj