blob: 4db1d7cb46c0845a7afdd64eb6cb8bb3237fa050 [file] [log] [blame]
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "readiness-io.h"
#include <kj/test.h>
#include <stdlib.h>
namespace kj {
namespace {
KJ_TEST("readiness IO: write small") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char buf[4];
auto readPromise = pipe.in->read(buf, 3, 4);
ReadyOutputStreamWrapper out(*pipe.out);
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3);
KJ_ASSERT(readPromise.wait(io.waitScope) == 3);
buf[3] = '\0';
KJ_ASSERT(kj::StringPtr(buf) == "foo");
}
KJ_TEST("readiness IO: write many odd") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyOutputStreamWrapper out(*pipe.out);
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) {
totalWritten += *n;
if (*n < 3) {
break;
}
} else {
KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?");
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "bar"[i%3]);
}
}
KJ_TEST("readiness IO: write even") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyOutputStreamWrapper out(*pipe.out);
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) {
totalWritten += *n;
if (*n < 2) {
KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?");
}
} else {
break;
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "ba"[i%2]);
}
}
KJ_TEST("readiness IO: write while corked") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char buf[7];
auto readPromise = pipe.in->read(buf, 3, 7);
ReadyOutputStreamWrapper out(*pipe.out);
auto cork = out.cork();
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3);
// Data hasn't been written yet.
KJ_ASSERT(!readPromise.poll(io.waitScope));
// Write some more, and observe it still isn't flushed out yet.
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3);
KJ_ASSERT(!readPromise.poll(io.waitScope));
// After reenabling pumping, the full read should succeed.
// We start this block with `if (true) {` instead of just `{` to avoid g++-8 compiler warnings
// telling us that this block isn't treated as part of KJ_ASSERT's internal `for` loop.
if (true) {
auto tmp = kj::mv(cork);
}
KJ_ASSERT(readPromise.wait(io.waitScope) == 6);
buf[6] = '\0';
KJ_ASSERT(kj::StringPtr(buf) == "foobar");
}
KJ_TEST("readiness IO: write many odd while corked") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
// The even/odd tests should work just as before even with automatic pumping
// corked, since we should still pump when the buffer fills up.
ReadyOutputStreamWrapper out(*pipe.out);
auto cork = out.cork();
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) {
totalWritten += *n;
if (*n < 3) {
break;
}
} else {
KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?");
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "bar"[i%3]);
}
// Eager pumping should still be corked.
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("bar").asBytes())) == 3);
auto readPromise = pipe.in->read(buf.begin(), 3, buf.size());
KJ_ASSERT(!readPromise.poll(io.waitScope));
}
KJ_TEST("readiness IO: write many even while corked") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyOutputStreamWrapper out(*pipe.out);
auto cork = out.cork();
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) {
totalWritten += *n;
if (*n < 2) {
KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?");
}
} else {
break;
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "ba"[i%2]);
}
// Eager pumping should still be corked.
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("ba").asBytes())) == 2);
auto readPromise = pipe.in->read(buf.begin(), 2, buf.size());
KJ_ASSERT(!readPromise.poll(io.waitScope));
}
KJ_TEST("readiness IO: read small") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyInputStreamWrapper in(*pipe.in);
char buf[4];
KJ_ASSERT(in.read(kj::ArrayPtr<char>(buf).asBytes()) == nullptr);
pipe.out->write("foo", 3).wait(io.waitScope);
in.whenReady().wait(io.waitScope);
KJ_ASSERT(KJ_ASSERT_NONNULL(in.read(kj::ArrayPtr<char>(buf).asBytes())) == 3);
buf[3] = '\0';
KJ_ASSERT(kj::StringPtr(buf) == "foo");
pipe.out = nullptr;
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
KJ_TEST("readiness IO: read many odd") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char dummy[8192];
for (auto i: kj::indices(dummy)) {
dummy[i] = "bar"[i%3];
}
auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
// shutdown
pipe.out = nullptr;
}).eagerlyEvaluate(nullptr);
ReadyInputStreamWrapper in(*pipe.in);
char buf[3];
for (;;) {
auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, result) {
for (size_t i = 0; i < *n; i++) {
KJ_ASSERT(buf[i] == "bar"[i]);
}
KJ_ASSERT(*n != 0, "ended at wrong spot");
if (*n < 3) {
break;
}
} else {
in.whenReady().wait(io.waitScope);
}
}
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
KJ_TEST("readiness IO: read many even") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char dummy[8192];
for (auto i: kj::indices(dummy)) {
dummy[i] = "ba"[i%2];
}
auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
// shutdown
pipe.out = nullptr;
}).eagerlyEvaluate(nullptr);
ReadyInputStreamWrapper in(*pipe.in);
char buf[2];
for (;;) {
auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, result) {
for (size_t i = 0; i < *n; i++) {
KJ_ASSERT(buf[i] == "ba"[i]);
}
if (*n == 0) {
break;
}
KJ_ASSERT(*n == 2, "ended at wrong spot");
} else {
in.whenReady().wait(io.waitScope);
}
}
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
} // namespace
} // namespace kj