blob: afb91ef93234fd1449d2d6a3bac423eb27d299cb [file] [log] [blame]
// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#define KJ_TESTING_KJ 1
#include "http.h"
#include <kj/debug.h>
#include <kj/test.h>
#include <kj/encoding.h>
#include <map>
#if KJ_HTTP_TEST_USE_OS_PIPE
// Run the test using OS-level socketpairs. (See http-socketpair-test.c++.)
#define KJ_HTTP_TEST_SETUP_IO \
auto io = kj::setupAsyncIo(); \
auto& waitScope KJ_UNUSED = io.waitScope
#define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \
auto listener = io.provider->getNetwork().parseAddress("localhost", 0) \
.wait(waitScope)->listen(); \
auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort()) \
.wait(waitScope)
#define KJ_HTTP_TEST_CREATE_2PIPE \
io.provider->newTwoWayPipe()
#else
// Run the test using in-process two-way pipes.
#define KJ_HTTP_TEST_SETUP_IO \
kj::EventLoop eventLoop; \
kj::WaitScope waitScope(eventLoop)
#define KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR \
auto capPipe = newCapabilityPipe(); \
auto listener = kj::heap<kj::CapabilityStreamConnectionReceiver>(*capPipe.ends[0]); \
auto addr = kj::heap<kj::CapabilityStreamNetworkAddress>(nullptr, *capPipe.ends[1])
#define KJ_HTTP_TEST_CREATE_2PIPE \
kj::newTwoWayPipe()
#endif
namespace kj {
namespace {
KJ_TEST("HttpMethod parse / stringify") {
#define TRY(name) \
KJ_EXPECT(kj::str(HttpMethod::name) == #name); \
KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \
KJ_EXPECT(*parsed == HttpMethod::name); \
} else { \
KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \
}
KJ_HTTP_FOR_EACH_METHOD(TRY)
#undef TRY
KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr);
KJ_EXPECT(tryParseHttpMethod("") == nullptr);
KJ_EXPECT(tryParseHttpMethod("G") == nullptr);
KJ_EXPECT(tryParseHttpMethod("GE") == nullptr);
KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr);
KJ_EXPECT(tryParseHttpMethod("get") == nullptr);
}
KJ_TEST("HttpHeaderTable") {
HttpHeaderTable::Builder builder;
auto host = builder.add("Host");
auto host2 = builder.add("hOsT");
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto bazQux2 = builder.add("Baz-Qux");
auto table = builder.build();
uint builtinHeaderCount = 0;
#define INCREMENT(id, name) ++builtinHeaderCount;
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT)
#undef INCREMENT
KJ_EXPECT(table->idCount() == builtinHeaderCount + 2);
KJ_EXPECT(host == HttpHeaderId::HOST);
KJ_EXPECT(host != HttpHeaderId::DATE);
KJ_EXPECT(host2 == host);
KJ_EXPECT(host != fooBar);
KJ_EXPECT(host != bazQux);
KJ_EXPECT(fooBar != bazQux);
KJ_EXPECT(bazQux == bazQux2);
KJ_EXPECT(kj::str(host) == "Host");
KJ_EXPECT(kj::str(host2) == "Host");
KJ_EXPECT(kj::str(fooBar) == "Foo-Bar");
KJ_EXPECT(kj::str(bazQux) == "baz-qux");
KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host");
KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date");
KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar");
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar);
KJ_EXPECT(table->stringToId("foobar") == nullptr);
KJ_EXPECT(table->stringToId("barfoo") == nullptr);
}
KJ_TEST("HttpHeaders::parseRequest") {
HttpHeaderTable::Builder builder;
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto table = builder.build();
HttpHeaders headers(*table);
auto text = kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"Content-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"with.dots: sure\r\n"
"\r\n");
auto result = headers.tryParseRequest(text.asArray()).get<HttpHeaders::Request>();
KJ_EXPECT(result.method == HttpMethod::POST);
KJ_EXPECT(result.url == "/some/path");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
KJ_EXPECT(headers.get(bazQux) == nullptr);
KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);
std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
});
KJ_EXPECT(unpackedHeaders.size() == 6);
KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
KJ_EXPECT(unpackedHeaders["Date"] == "early");
KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
KJ_EXPECT(unpackedHeaders["with.dots"] == "sure");
KJ_EXPECT(headers.serializeRequest(result.method, result.url) ==
"POST /some/path HTTP/1.1\r\n"
"Content-Length: 123\r\n"
"Host: example.com\r\n"
"Date: early\r\n"
"Foo-Bar: Baz\r\n"
"other-Header: yep\r\n"
"with.dots: sure\r\n"
"\r\n");
}
KJ_TEST("HttpHeaders::parseResponse") {
HttpHeaderTable::Builder builder;
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto table = builder.build();
HttpHeaders headers(*table);
auto text = kj::heapString(
"HTTP/1.1\t\t 418\t I'm a teapot\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"Content-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto result = headers.tryParseResponse(text.asArray()).get<HttpHeaders::Response>();
KJ_EXPECT(result.statusCode == 418);
KJ_EXPECT(result.statusText == "I'm a teapot");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
KJ_EXPECT(headers.get(bazQux) == nullptr);
KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);
std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
});
KJ_EXPECT(unpackedHeaders.size() == 5);
KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
KJ_EXPECT(unpackedHeaders["Date"] == "early");
KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
KJ_EXPECT(headers.serializeResponse(
result.statusCode, result.statusText) ==
"HTTP/1.1 418 I'm a teapot\r\n"
"Content-Length: 123\r\n"
"Host: example.com\r\n"
"Date: early\r\n"
"Foo-Bar: Baz\r\n"
"other-Header: yep\r\n"
"\r\n");
}
KJ_TEST("HttpHeaders parse invalid") {
auto table = HttpHeaderTable::Builder().build();
HttpHeaders headers(*table);
// NUL byte in request.
{
auto input = kj::heapString(
"POST \0 /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
KJ_EXPECT(protocolError.description == "Request headers have no terminal newline.",
protocolError.description);
KJ_EXPECT(protocolError.rawContent.asChars() == input);
}
// Control character in header name.
{
auto input = kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Cont\001ent-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.",
protocolError.description);
KJ_EXPECT(protocolError.rawContent.asChars() == input);
}
// Separator character in header name.
{
auto input = kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE/: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
KJ_EXPECT(protocolError.description == "The headers sent by your client are not valid.",
protocolError.description);
KJ_EXPECT(protocolError.rawContent.asChars() == input);
}
// Response status code not numeric.
{
auto input = kj::heapString(
"HTTP/1.1\t\t abc\t I'm a teapot\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto protocolError = headers.tryParseRequest(input).get<HttpHeaders::ProtocolError>();
KJ_EXPECT(protocolError.description == "Unrecognized request method.",
protocolError.description);
KJ_EXPECT(protocolError.rawContent.asChars() == input);
}
}
KJ_TEST("HttpHeaders require valid HttpHeaderTable") {
const auto ERROR_MESSAGE =
"HttpHeaders object was constructed from HttpHeaderTable "
"that wasn't fully built yet at the time of construction"_kj;
{
// A tabula rasa is valid.
HttpHeaderTable table;
KJ_REQUIRE(table.isReady());
HttpHeaders headers(table);
}
{
// A future table is not valid.
HttpHeaderTable::Builder builder;
auto& futureTable = builder.getFutureTable();
KJ_REQUIRE(!futureTable.isReady());
auto makeHeadersThenBuild = [&]() {
HttpHeaders headers(futureTable);
auto table = builder.build();
};
KJ_EXPECT_THROW_MESSAGE(ERROR_MESSAGE, makeHeadersThenBuild());
}
{
// A well built table is valid.
HttpHeaderTable::Builder builder;
auto& futureTable = builder.getFutureTable();
KJ_REQUIRE(!futureTable.isReady());
auto ownedTable = builder.build();
KJ_REQUIRE(futureTable.isReady());
KJ_REQUIRE(ownedTable->isReady());
HttpHeaders headers(futureTable);
}
}
KJ_TEST("HttpHeaders validation") {
auto table = HttpHeaderTable::Builder().build();
HttpHeaders headers(*table);
headers.add("Valid-Name", "valid value");
// The HTTP RFC prohibits control characters, but browsers only prohibit \0, \r, and \n. KJ goes
// with the browsers for compatibility.
headers.add("Valid-Name", "valid\x01value");
// The HTTP RFC does not permit non-ASCII values.
// KJ chooses to interpret them as UTF-8, to avoid the need for any expensive conversion.
// Browsers apparently interpret them as LATIN-1. Applications can reinterpet these strings as
// LATIN-1 easily enough if they really need to.
headers.add("Valid-Name", u8"valid€value");
KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid Name", "value"));
KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid@Name", "value"));
KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.set(HttpHeaderId::HOST, "in\nvalid"));
KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.add("Valid-Name", "in\nvalid"));
}
KJ_TEST("HttpHeaders Set-Cookie handling") {
HttpHeaderTable::Builder builder;
auto hCookie = builder.add("Cookie");
auto hSetCookie = builder.add("Set-Cookie");
auto table = builder.build();
HttpHeaders headers(*table);
headers.set(hCookie, "Foo");
headers.add("Cookie", "Bar");
headers.add("Cookie", "Baz");
headers.set(hSetCookie, "Foo");
headers.add("Set-Cookie", "Bar");
headers.add("Set-Cookie", "Baz");
auto text = headers.toString();
KJ_EXPECT(text ==
"Cookie: Foo, Bar, Baz\r\n"
"Set-Cookie: Foo\r\n"
"Set-Cookie: Bar\r\n"
"Set-Cookie: Baz\r\n"
"\r\n", text);
}
// =======================================================================================
class ReadFragmenter final: public kj::AsyncIoStream {
public:
ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {}
Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
}
Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); }
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return inner.pumpTo(output, amount);
}
Promise<void> write(const void* buffer, size_t size) override {
return inner.write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return inner.write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
return inner.tryPumpFrom(input, amount);
}
Promise<void> whenWriteDisconnected() override {
return inner.whenWriteDisconnected();
}
void shutdownWrite() override {
return inner.shutdownWrite();
}
void abortRead() override { return inner.abortRead(); }
void getsockopt(int level, int option, void* value, uint* length) override {
return inner.getsockopt(level, option, value, length);
}
void setsockopt(int level, int option, const void* value, uint length) override {
return inner.setsockopt(level, option, value, length);
}
void getsockname(struct sockaddr* addr, uint* length) override {
return inner.getsockname(addr, length);
}
void getpeername(struct sockaddr* addr, uint* length) override {
return inner.getsockname(addr, length);
}
private:
kj::AsyncIoStream& inner;
size_t limit;
};
template <typename T>
class InitializeableArray: public Array<T> {
public:
InitializeableArray(std::initializer_list<T> init)
: Array<T>(kj::heapArray(init)) {}
};
enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY };
struct HeaderTestCase {
HttpHeaderId id;
kj::StringPtr value;
};
struct HttpRequestTestCase {
kj::StringPtr raw;
HttpMethod method;
kj::StringPtr path;
InitializeableArray<HeaderTestCase> requestHeaders;
kj::Maybe<uint64_t> requestBodySize;
InitializeableArray<kj::StringPtr> requestBodyParts;
Side side = BOTH;
// TODO(cleanup): Delete this constructor if/when we move to C++14.
HttpRequestTestCase(kj::StringPtr raw, HttpMethod method, kj::StringPtr path,
InitializeableArray<HeaderTestCase> requestHeaders,
kj::Maybe<uint64_t> requestBodySize,
InitializeableArray<kj::StringPtr> requestBodyParts,
Side side = BOTH)
: raw(raw), method(method), path(path), requestHeaders(kj::mv(requestHeaders)),
requestBodySize(requestBodySize), requestBodyParts(kj::mv(requestBodyParts)),
side(side) {}
};
struct HttpResponseTestCase {
kj::StringPtr raw;
uint64_t statusCode;
kj::StringPtr statusText;
InitializeableArray<HeaderTestCase> responseHeaders;
kj::Maybe<uint64_t> responseBodySize;
InitializeableArray<kj::StringPtr> responseBodyParts;
HttpMethod method = HttpMethod::GET;
Side side = BOTH;
// TODO(cleanup): Delete this constructor if/when we move to C++14.
HttpResponseTestCase(kj::StringPtr raw, uint64_t statusCode, kj::StringPtr statusText,
InitializeableArray<HeaderTestCase> responseHeaders,
kj::Maybe<uint64_t> responseBodySize,
InitializeableArray<kj::StringPtr> responseBodyParts,
HttpMethod method = HttpMethod::GET,
Side side = BOTH)
: raw(raw), statusCode(statusCode), statusText(statusText),
responseHeaders(kj::mv(responseHeaders)), responseBodySize(responseBodySize),
responseBodyParts(kj::mv(responseBodyParts)), method(method), side(side) {}
};
struct HttpTestCase {
HttpRequestTestCase request;
HttpResponseTestCase response;
};
kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) {
if (parts.size() == 0) return kj::READY_NOW;
return out.write(parts[0].begin(), parts[0].size())
.then([&out,parts]() {
return writeEach(out, parts.slice(1, parts.size()));
});
}
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<char>(expected.size());
auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
}
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount));
}));
}
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte> expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<byte>(expected.size());
auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<byte> buffer, size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
}
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount, expected.size()));
}));
}
kj::Promise<void> expectEnd(kj::AsyncInputStream& in) {
static char buffer;
auto promise = in.tryRead(&buffer, 1, 1);
return promise.then([](size_t amount) {
KJ_ASSERT(amount == 0, "expected EOF");
});
}
void testHttpClientRequest(kj::WaitScope& waitScope, const HttpRequestTestCase& testCase,
kj::TwoWayPipe pipe) {
auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() {
static const char SIMPLE_RESPONSE[] =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n";
return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE));
}).then([&]() -> kj::Promise<void> {
return kj::NEVER_DONE;
});
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
HttpHeaders headers(table);
for (auto& header: testCase.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize);
if (testCase.requestBodyParts.size() > 0) {
writeEach(*request.body, testCase.requestBodyParts).wait(waitScope);
}
request.body = nullptr;
auto clientTask = request.response
.then([&](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).ignoreResult();
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase& testCase,
size_t readFragmentSize, kj::TwoWayPipe pipe) {
ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize);
auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD
? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n")
: kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n");
auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() {
return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size());
}).then([&]() -> kj::Promise<void> {
pipe.ends[1]->shutdownWrite();
return kj::NEVER_DONE;
});
HttpHeaderTable table;
auto client = newHttpClient(table, fragmenter);
HttpHeaders headers(table);
auto request = client->request(testCase.method, "/", headers, uint64_t(0));
request.body = nullptr;
auto clientTask = request.response
.then([&](HttpClient::Response&& response) {
KJ_EXPECT(response.statusCode == testCase.statusCode);
KJ_EXPECT(response.statusText == testCase.statusText);
for (auto& header: testCase.responseHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value);
}
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([&](kj::String body) {
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body);
});
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table,
HttpClient& client, const HttpTestCase& testCase) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client.request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
class TestHttpService final: public HttpService {
public:
TestHttpService(const HttpRequestTestCase& expectedRequest,
const HttpResponseTestCase& response,
HttpHeaderTable& table)
: singleExpectedRequest(&expectedRequest),
singleResponse(&response),
responseHeaders(table) {}
TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases,
HttpHeaderTable& table)
: singleExpectedRequest(nullptr),
singleResponse(nullptr),
testCases(testCases),
responseHeaders(table) {}
uint getRequestCount() { return requestCount; }
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest :
testCases[requestCount % testCases.size()].request;
auto& response = testCases == nullptr ? *singleResponse :
testCases[requestCount % testCases.size()].response;
++requestCount;
KJ_EXPECT(method == expectedRequest.method, method);
KJ_EXPECT(url == expectedRequest.path, url);
for (auto& header: expectedRequest.requestHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value);
}
auto size = requestBody.tryGetLength();
KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) {
KJ_IF_MAYBE(s, size) {
KJ_EXPECT(*s == *expectedSize, *s);
} else {
KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size");
}
} else {
KJ_EXPECT(size == nullptr);
}
return requestBody.readAllText()
.then([this,&expectedRequest,&response,&responseSender](kj::String text) {
KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text);
responseHeaders.clear();
for (auto& header: response.responseHeaders) {
responseHeaders.set(header.id, header.value);
}
auto stream = responseSender.send(response.statusCode, response.statusText,
responseHeaders, response.responseBodySize);
auto promise = writeEach(*stream, response.responseBodyParts);
return promise.attach(kj::mv(stream));
});
}
private:
const HttpRequestTestCase* singleExpectedRequest;
const HttpResponseTestCase* singleResponse;
kj::ArrayPtr<const HttpTestCase> testCases;
HttpHeaders responseHeaders;
uint requestCount = 0;
};
void testHttpServerRequest(kj::WaitScope& waitScope, kj::Timer& timer,
const HttpRequestTestCase& requestCase,
const HttpResponseTestCase& responseCase,
kj::TwoWayPipe pipe) {
HttpHeaderTable table;
TestHttpService service(requestCase, responseCase, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(waitScope);
pipe.ends[1]->shutdownWrite();
expectRead(*pipe.ends[1], responseCase.raw).wait(waitScope);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == 1);
}
kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
static const auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), "");
static const auto HUGE_REQUEST = kj::str(
"GET / HTTP/1.1\r\n"
"Host: ", HUGE_STRING, "\r\n"
"\r\n");
static const HttpRequestTestCase REQUEST_TEST_CASES[] {
{
"GET /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n",
HttpMethod::GET,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
uint64_t(0), {},
},
{
"HEAD /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n",
HttpMethod::HEAD,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
uint64_t(0), {},
},
{
"POST / HTTP/1.1\r\n"
"Content-Length: 9\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"foobarbaz",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
9, { "foo", "bar", "baz" },
},
{
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"3\r\n"
"foo\r\n"
"6\r\n"
"barbaz\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
nullptr, { "foo", "barbaz" },
},
{
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"1d\r\n"
"0123456789abcdef0123456789abc\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
nullptr, { "0123456789abcdef0123456789abc" },
},
{
HUGE_REQUEST,
HttpMethod::GET,
"/",
{{HttpHeaderId::HOST, HUGE_STRING}},
uint64_t(0), {}
},
{
"GET /foo/bar HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"Host: example.com\r\n"
"\r\n"
"foobar",
HttpMethod::GET,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
uint64_t(6), { "foobar" },
},
{
"GET /foo/bar HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"\r\n"
"3\r\n"
"foo\r\n"
"3\r\n"
"bar\r\n"
"0\r\n"
"\r\n",
HttpMethod::GET,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::TRANSFER_ENCODING, "chunked"}},
nullptr, { "foo", "bar" },
}
};
// TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents REQUEST_TEST_CASES from implicitly
// casting to our return type.
return kj::arrayPtr(REQUEST_TEST_CASES, kj::size(REQUEST_TEST_CASES));
}
kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() {
static const HttpResponseTestCase RESPONSE_TEST_CASES[] {
{
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"baz qux",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, {"baz qux"},
HttpMethod::GET,
CLIENT_ONLY, // Server never sends connection: close
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Transfer-Encoding: identity\r\n"
"Content-Length: foobar\r\n" // intentionally wrong
"\r\n"
"baz qux",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, {"baz qux"},
HttpMethod::GET,
CLIENT_ONLY, // Server never sends transfer-encoding: identity
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"baz qux",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, {"baz qux"},
HttpMethod::GET,
CLIENT_ONLY, // Server never sends non-delimited message
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 123\r\n"
"Content-Type: text/plain\r\n"
"\r\n",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
123, {},
HttpMethod::HEAD,
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: foobar\r\n"
"Content-Type: text/plain\r\n"
"\r\n",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"},
{HttpHeaderId::CONTENT_LENGTH, "foobar"}},
123, {},
HttpMethod::HEAD,
},
// Zero-length expected size response to HEAD request has no Content-Length header.
{
"HTTP/1.1 200 OK\r\n"
"\r\n",
200, "OK",
{},
uint64_t(0), {},
HttpMethod::HEAD,
},
{
"HTTP/1.1 204 No Content\r\n"
"\r\n",
204, "No Content",
{},
uint64_t(0), {},
},
{
"HTTP/1.1 205 Reset Content\r\n"
"Content-Length: 0\r\n"
"\r\n",
205, "Reset Content",
{},
uint64_t(0), {},
},
{
"HTTP/1.1 304 Not Modified\r\n"
"\r\n",
304, "Not Modified",
{},
uint64_t(0), {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 8\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"quxcorge",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
8, { "qux", "corge" }
},
{
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"3\r\n"
"qux\r\n"
"5\r\n"
"corge\r\n"
"0\r\n"
"\r\n",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, { "qux", "corge" }
},
};
// TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
// casting to our return type.
return kj::arrayPtr(RESPONSE_TEST_CASES, kj::size(RESPONSE_TEST_CASES));
}
KJ_TEST("HttpClient requests") {
KJ_HTTP_TEST_SETUP_IO;
for (auto& testCase: requestTestCases()) {
if (testCase.side == SERVER_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpClientRequest(waitScope, testCase, KJ_HTTP_TEST_CREATE_2PIPE);
}
}
KJ_TEST("HttpClient responses") {
KJ_HTTP_TEST_SETUP_IO;
size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue };
for (auto& testCase: responseTestCases()) {
if (testCase.side == SERVER_ONLY) continue;
for (size_t fragmentSize: FRAGMENT_SIZES) {
KJ_CONTEXT(testCase.raw, fragmentSize);
testHttpClientResponse(waitScope, testCase, fragmentSize, KJ_HTTP_TEST_CREATE_2PIPE);
}
}
}
KJ_TEST("HttpClient canceled write") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto serverPromise = pipe.ends[1]->readAllText();
{
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
auto body = kj::heapArray<byte>(4096);
memset(body.begin(), 0xcf, body.size());
auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096));
// Note: This poll() forces the server to receive what was written so far. Otherwise,
// cancelling the write below may in fact cancel the previous write as well.
KJ_EXPECT(!serverPromise.poll(waitScope));
// Start a write and immediately cancel it.
{
auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size());
}
KJ_EXPECT_THROW_MESSAGE("overwrote", req.body->write("foo", 3).wait(waitScope));
req.body = nullptr;
KJ_EXPECT(!serverPromise.poll(waitScope));
KJ_EXPECT_THROW_MESSAGE("can't start new request until previous request body",
client->request(HttpMethod::GET, "/", HttpHeaders(table)).response.wait(waitScope));
}
pipe.ends[0]->shutdownWrite();
auto text = serverPromise.wait(waitScope);
KJ_EXPECT(text == "POST / HTTP/1.1\r\nContent-Length: 4096\r\n\r\n", text);
}
KJ_TEST("HttpClient chunked body gather-write") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto serverPromise = pipe.ends[1]->readAllText();
{
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table));
kj::ArrayPtr<const byte> bodyParts[] = {
"foo"_kj.asBytes(), " "_kj.asBytes(),
"bar"_kj.asBytes(), " "_kj.asBytes(),
"baz"_kj.asBytes()
};
req.body->write(kj::arrayPtr(bodyParts, kj::size(bodyParts))).wait(waitScope);
req.body = nullptr;
// Wait for a response so the client has a chance to end the request body with a 0-chunk.
kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n";
pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope);
auto response = req.response.wait(waitScope);
}
pipe.ends[0]->shutdownWrite();
auto text = serverPromise.wait(waitScope);
KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
"b\r\nfoo bar baz\r\n0\r\n\r\n", text);
}
KJ_TEST("HttpClient chunked body pump from fixed length stream") {
class FixedBodyStream final: public kj::AsyncInputStream {
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
auto n = kj::min(body.size(), maxBytes);
n = kj::max(n, minBytes);
n = kj::min(n, body.size());
memcpy(buffer, body.begin(), n);
body = body.slice(n);
return n;
}
Maybe<uint64_t> tryGetLength() override { return body.size(); }
kj::StringPtr body = "foo bar baz";
};
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto serverPromise = pipe.ends[1]->readAllText();
{
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table));
FixedBodyStream bodyStream;
bodyStream.pumpTo(*req.body).wait(waitScope);
req.body = nullptr;
// Wait for a response so the client has a chance to end the request body with a 0-chunk.
kj::StringPtr responseText = "HTTP/1.1 204 No Content\r\n\r\n";
pipe.ends[1]->write(responseText.begin(), responseText.size()).wait(waitScope);
auto response = req.response.wait(waitScope);
}
pipe.ends[0]->shutdownWrite();
auto text = serverPromise.wait(waitScope);
KJ_EXPECT(text == "POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"
"b\r\nfoo bar baz\r\n0\r\n\r\n", text);
}
KJ_TEST("HttpServer requests") {
HttpResponseTestCase RESPONSE = {
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"\r\n"
"foo",
200, "OK",
{},
3, {"foo"}
};
HttpResponseTestCase HEAD_RESPONSE = {
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"\r\n",
200, "OK",
{},
3, {"foo"}
};
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
for (auto& testCase: requestTestCases()) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(waitScope, timer, testCase,
testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE,
KJ_HTTP_TEST_CREATE_2PIPE);
}
}
KJ_TEST("HttpServer responses") {
HttpRequestTestCase REQUEST = {
"GET / HTTP/1.1\r\n"
"\r\n",
HttpMethod::GET,
"/",
{},
uint64_t(0), {},
};
HttpRequestTestCase HEAD_REQUEST = {
"HEAD / HTTP/1.1\r\n"
"\r\n",
HttpMethod::HEAD,
"/",
{},
uint64_t(0), {},
};
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(waitScope, timer,
testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase,
KJ_HTTP_TEST_CREATE_2PIPE);
}
}
// -----------------------------------------------------------------------------
kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
static const HttpTestCase PIPELINE_TESTS[] = {
{
{
"GET / HTTP/1.1\r\n"
"\r\n",
HttpMethod::GET, "/", {}, uint64_t(0), {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 7\r\n"
"\r\n"
"foo bar",
200, "OK", {}, 7, { "foo bar" }
},
},
{
{
"POST /foo HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"\r\n"
"grault",
HttpMethod::POST, "/foo", {}, 6, { "grault" },
},
{
"HTTP/1.1 404 Not Found\r\n"
"Content-Length: 13\r\n"
"\r\n"
"baz qux corge",
404, "Not Found", {}, 13, { "baz qux corge" }
},
},
// Throw a zero-size request/response into the pipeline to check for a bug that existed with
// them previously.
{
{
"POST /foo HTTP/1.1\r\n"
"Content-Length: 0\r\n"
"\r\n",
HttpMethod::POST, "/foo", {}, uint64_t(0), {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n",
200, "OK", {}, uint64_t(0), {}
},
},
// Also a zero-size chunked request/response.
{
{
"POST /foo HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST, "/foo", {}, nullptr, {},
},
{
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n",
200, "OK", {}, nullptr, {}
},
},
{
{
"POST /bar HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"garply\r\n"
"5\r\n"
"waldo\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" },
},
{
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"4\r\n"
"fred\r\n"
"5\r\n"
"plugh\r\n"
"0\r\n"
"\r\n",
200, "OK", {}, nullptr, { "fred", "plugh" }
},
},
{
{
"HEAD / HTTP/1.1\r\n"
"\r\n",
HttpMethod::HEAD, "/", {}, uint64_t(0), {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 7\r\n"
"\r\n",
200, "OK", {}, 7, { "foo bar" }
},
},
// Zero-length expected size response to HEAD request has no Content-Length header.
{
{
"HEAD / HTTP/1.1\r\n"
"\r\n",
HttpMethod::HEAD, "/", {}, uint64_t(0), {},
},
{
"HTTP/1.1 200 OK\r\n"
"\r\n",
200, "OK", {}, uint64_t(0), {}, HttpMethod::HEAD,
},
},
};
// TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
// casting to our return type.
return kj::arrayPtr(PIPELINE_TESTS, kj::size(PIPELINE_TESTS));
}
KJ_TEST("HttpClient pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
writeResponsesPromise = writeResponsesPromise
.then([&]() {
return expectRead(*pipe.ends[1], testCase.request.raw);
}).then([&]() {
return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
});
}
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
testHttpClient(waitScope, table, *client, testCase);
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("HttpClient parallel pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
kj::Promise<void> readRequestsPromise = kj::READY_NOW;
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
auto forked = readRequestsPromise
.then([&]() {
return expectRead(*pipe.ends[1], testCase.request.raw);
}).fork();
readRequestsPromise = forked.addBranch();
// Don't write each response until the corresponding request is received.
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(forked.addBranch());
promises.add(kj::mv(writeResponsesPromise));
writeResponsesPromise = kj::joinPromises(promises.finish()).then([&]() {
return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
});
}
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
return kj::mv(request.response);
};
for (auto i: kj::indices(PIPELINE_TESTS)) {
auto& testCase = PIPELINE_TESTS[i];
auto response = responsePromises[i].wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("HttpServer pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(waitScope);
expectRead(*pipe.ends[1], testCase.response.raw).wait(waitScope);
}
pipe.ends[1]->shutdownWrite();
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
KJ_TEST("HttpServer parallel pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto allRequestText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
auto allResponseText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(waitScope);
pipe.ends[1]->shutdownWrite();
auto rawResponse = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(rawResponse == allResponseText, rawResponse);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
KJ_TEST("HttpClient <-> HttpServer") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[1]));
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
testHttpClient(waitScope, table, *client, testCase);
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpInputStream requests") {
KJ_HTTP_TEST_SETUP_IO;
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: requestTestCases()) {
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: requestTestCases()) {
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto req = input->readRequest().wait(waitScope);
KJ_EXPECT(req.method == testCase.method);
KJ_EXPECT(req.url == testCase.path);
for (auto& header: testCase.requestHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(req.headers.get(header.id)) == header.value);
}
auto body = req.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.requestBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream responses") {
KJ_HTTP_TEST_SETUP_IO;
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto resp = input->readResponse(testCase.method).wait(waitScope);
KJ_EXPECT(resp.statusCode == testCase.statusCode);
KJ_EXPECT(resp.statusText == testCase.statusText);
for (auto& header: testCase.responseHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(resp.headers.get(header.id)) == header.value);
}
auto body = resp.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream bare messages") {
KJ_HTTP_TEST_SETUP_IO;
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::StringPtr messages =
"Content-Length: 6\r\n"
"\r\n"
"foobar"
"Content-Length: 11\r\n"
"Content-Type: some/type\r\n"
"\r\n"
"bazquxcorge"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"grault\r\n"
"b\r\n"
"garplywaldo\r\n"
"0\r\n"
"\r\n"_kj;
kj::Promise<void> writeTask = pipe.out->write(messages.begin(), messages.size())
.then([&]() { pipe.out = nullptr; });
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "6");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "foobar");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "11");
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_TYPE)) == "some/type");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "bazquxcorge");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::TRANSFER_ENCODING)) == "chunked");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "graultgarplywaldo");
}
writeTask.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
// -----------------------------------------------------------------------------
KJ_TEST("WebSocket core protocol") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
auto mediumString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 30), "");
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 10000), "");
auto clientTask = client->send(kj::StringPtr("hello"))
.then([&]() { return client->send(mediumString); })
.then([&]() { return client->send(bigString); })
.then([&]() { return client->send(kj::StringPtr("world").asBytes()); })
.then([&]() { return client->close(1234, "bored"); })
.then([&]() { KJ_EXPECT(client->sentByteCount() == 90307)});
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello");
}
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == mediumString);
}
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == bigString);
}
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::Array<byte>>());
KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world");
}
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 1234);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored");
KJ_EXPECT(server->receivedByteCount() == 90307);
}
auto serverTask = server->close(4321, "whatever");
{
auto message = client->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 4321);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever");
KJ_EXPECT(client->receivedByteCount() == 12);
}
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket fragmented") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
byte DATA[] = {
0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
0x00, 0x03, 'w', 'o', 'r',
0x80, 0x02, 'l', 'd',
};
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
clientTask.wait(waitScope);
}
class FakeEntropySource final: public EntropySource {
public:
void generate(kj::ArrayPtr<byte> buffer) override {
static constexpr byte DUMMY[4] = { 12, 34, 56, 78 };
for (auto i: kj::indices(buffer)) {
buffer[i] = DUMMY[i % sizeof(DUMMY)];
}
}
};
KJ_TEST("WebSocket masked") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
FakeEntropySource maskGenerator;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), maskGenerator);
byte DATA[] = {
0x81, 0x86, 12, 34, 56, 78, 'h' ^ 12, 'e' ^ 34, 'l' ^ 56, 'l' ^ 78, 'o' ^ 12, ' ' ^ 34,
};
auto clientTask = client->write(DATA, sizeof(DATA));
auto serverTask = server->send(kj::StringPtr("hello "));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello ");
}
expectRead(*client, DATA).wait(waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket unsolicited pong") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
byte DATA[] = {
0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
0x8A, 0x03, 'f', 'o', 'o',
0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
};
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
clientTask.wait(waitScope);
}
KJ_TEST("WebSocket ping") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
// Be extra-annoying by having the ping arrive between fragments.
byte DATA[] = {
0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',
0x89, 0x03, 'f', 'o', 'o',
0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
};
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
auto serverTask = server->send(kj::StringPtr("bar"));
byte EXPECTED[] = {
0x8A, 0x03, 'f', 'o', 'o', // pong
0x81, 0x03, 'b', 'a', 'r', // message
};
expectRead(*client, EXPECTED).wait(waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket ping mid-send") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
byte DATA[] = {
0x89, 0x03, 'f', 'o', 'o', // ping
0x81, 0x03, 'b', 'a', 'r', // some other message
};
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(*client, EXPECTED1).wait(waitScope);
expectRead(*client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
expectRead(*client, EXPECTED2).wait(waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
class InputOutputPair final: public kj::AsyncIoStream {
// Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream.
public:
InputOutputPair(kj::Own<kj::AsyncInputStream> in, kj::Own<kj::AsyncOutputStream> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return in->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return in->tryRead(buffer, minBytes, maxBytes);
}
Maybe<uint64_t> tryGetLength() override {
return in->tryGetLength();
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override {
return in->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return out->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return out->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return out->tryPumpFrom(input, amount);
}
Promise<void> whenWriteDisconnected() override {
return out->whenWriteDisconnected();
}
void shutdownWrite() override {
out = nullptr;
}
private:
kj::Own<kj::AsyncInputStream> in;
kj::Own<kj::AsyncOutputStream> out;
};
KJ_TEST("WebSocket double-ping mid-send") {
KJ_HTTP_TEST_SETUP_IO;
auto upPipe = newOneWayPipe();
auto downPipe = newOneWayPipe();
InputOutputPair client(kj::mv(downPipe.in), kj::mv(upPipe.out));
auto server = newWebSocket(kj::heap<InputOutputPair>(kj::mv(upPipe.in), kj::mv(downPipe.out)),
nullptr);
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
byte DATA[] = {
0x89, 0x03, 'f', 'o', 'o', // ping
0x89, 0x03, 'q', 'u', 'x', // ping2
0x81, 0x03, 'b', 'a', 'r', // some other message
};
auto clientTask = client.write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(client, EXPECTED1).wait(waitScope);
expectRead(client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' };
expectRead(client, EXPECTED2).wait(waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket ping received during pong send") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
// Send a very large ping so that sending the pong takes a while. Then send a second ping
// immediately after.
byte PREFIX[] = { 0x89, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
byte POSTFIX[] = {
0x89, 0x03, 'f', 'o', 'o',
0x81, 0x03, 'b', 'a', 'r',
};
kj::ArrayPtr<const byte> parts[] = {PREFIX, bigString.asBytes(), POSTFIX};
auto clientTask = client->write(parts);
{
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(*client, EXPECTED1).wait(waitScope);
expectRead(*client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
expectRead(*client, EXPECTED2).wait(waitScope);
clientTask.wait(waitScope);
}
KJ_TEST("WebSocket pump byte counting") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
FakeEntropySource maskGenerator;
auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr);
auto pumpTask = server1->pumpTo(*client2);
auto receiveTask = server2->receive();
// Client sends three bytes of a valid message then disconnects.
const char DATA[] = {0x01, 0x06, 'h'};
pipe1.ends[0]->write(DATA, 3).wait(waitScope);
pipe1.ends[0] = nullptr;
// The pump completes successfully, forwarding the disconnect.
pumpTask.wait(waitScope);
// The eventual receiver gets a disconnect exception.
// (Note: We don't use KJ_EXPECT_THROW here because under -fno-exceptions it forks and we lose
// state.)
receiveTask.then([](auto) {
KJ_FAIL_EXPECT("expected exception");
}, [](kj::Exception&& e) {
KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
}).wait(waitScope);
KJ_EXPECT(server1->receivedByteCount() == 3);
#if KJ_NO_RTTI
// Optimized socket pump will be disabled, so only whole messages are counted by client2/server2.
KJ_EXPECT(client2->sentByteCount() == 0);
KJ_EXPECT(server2->receivedByteCount() == 0);
#else
KJ_EXPECT(client2->sentByteCount() == 3);
KJ_EXPECT(server2->receivedByteCount() == 3);
#endif
}
KJ_TEST("WebSocket pump disconnect on send") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
FakeEntropySource maskGenerator;
auto client1 = newWebSocket(kj::mv(pipe1.ends[0]), maskGenerator);
auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
auto pumpTask = server1->pumpTo(*client2);
auto sendTask = client1->send("hello"_kj);
// Endpoint reads three bytes and then disconnects.
char buffer[3];
pipe2.ends[1]->read(buffer, 3).wait(waitScope);
pipe2.ends[1] = nullptr;
// Pump throws disconnected.
KJ_EXPECT_THROW_RECOVERABLE(DISCONNECTED, pumpTask.wait(waitScope));
// client1 may or may not have been able to send its whole message depending on buffering.
sendTask.then([]() {}, [](kj::Exception&& e) {
KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
}).wait(waitScope);
}
KJ_TEST("WebSocket pump disconnect on receive") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe1 = KJ_HTTP_TEST_CREATE_2PIPE;
auto pipe2 = KJ_HTTP_TEST_CREATE_2PIPE;
FakeEntropySource maskGenerator;
auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), maskGenerator);
auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr);
auto pumpTask = server1->pumpTo(*client2);
auto receiveTask = server2->receive();
// Client sends three bytes of a valid message then disconnects.
const char DATA[] = {0x01, 0x06, 'h'};
pipe1.ends[0]->write(DATA, 3).wait(waitScope);
pipe1.ends[0] = nullptr;
// The pump completes successfully, forwarding the disconnect.
pumpTask.wait(waitScope);
// The eventual receiver gets a disconnect exception.
KJ_EXPECT_THROW(DISCONNECTED, receiveTask.wait(waitScope));
}
class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler {
public:
TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader)
: headerTable(headerTable), hMyHeader(hMyHeader), tasks(*this) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
KJ_ASSERT(headers.isWebSocket());
HttpHeaders responseHeaders(headerTable);
KJ_IF_MAYBE(h, headers.get(hMyHeader)) {
responseHeaders.set(hMyHeader, kj::str("respond-", *h));
}
if (url == "/return-error") {
response.send(404, "Not Found", responseHeaders, uint64_t(0));
return kj::READY_NOW;
} else if (url == "/websocket") {
auto ws = response.acceptWebSocket(responseHeaders);
return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
} else {
KJ_FAIL_ASSERT("unexpected path", url);
}
}
private:
HttpHeaderTable& headerTable;
HttpHeaderId hMyHeader;
kj::TaskSet tasks;
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}
static kj::Promise<void> doWebSocket(WebSocket& ws, kj::StringPtr message) {
auto copy = kj::str(message);
return ws.send(copy).attach(kj::mv(copy))
.then([&ws]() {
return ws.receive();
}).then([&ws](WebSocket::Message&& message) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(str, kj::String) {
return doWebSocket(ws, kj::str("reply:", str));
}
KJ_CASE_ONEOF(data, kj::Array<byte>) {
return doWebSocket(ws, kj::str("reply:", data));
}
KJ_CASE_ONEOF(close, WebSocket::Close) {
auto reason = kj::str("close-reply:", close.reason);
return ws.close(close.code + 1, reason).attach(kj::mv(reason));
}
}
KJ_UNREACHABLE;
});
}
};
const char WEBSOCKET_REQUEST_HANDSHAKE[] =
" HTTP/1.1\r\n"
"Connection: Upgrade\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Key: DCI4TgwiOE4MIjhODCI4Tg==\r\n"
"Sec-WebSocket-Version: 13\r\n"
"My-Header: foo\r\n"
"\r\n";
const char WEBSOCKET_RESPONSE_HANDSHAKE[] =
"HTTP/1.1 101 Switching Protocols\r\n"
"Connection: Upgrade\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Accept: pShtIFKT0s8RYZvnWY/CrjQD8CM=\r\n"
"My-Header: respond-foo\r\n"
"\r\n";
const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] =
"HTTP/1.1 404 Not Found\r\n"
"Content-Length: 0\r\n"
"My-Header: respond-foo\r\n"
"\r\n";
const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] =
{ 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' };
const byte WEBSOCKET_SEND_MESSAGE[] =
{ 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 };
const byte WEBSOCKET_REPLY_MESSAGE[] =
{ 0x81, 0x09, 'r','e','p','l','y',':','b','a','r' };
const byte WEBSOCKET_SEND_CLOSE[] =
{ 0x88, 0x85, 12, 34, 56, 78, 0x12^12, 0x34^34, 'q'^56, 'u'^78, 'x'^12 };
const byte WEBSOCKET_REPLY_CLOSE[] =
{ 0x88, 0x11, 0x12, 0x35, 'c','l','o','s','e','-','r','e','p','l','y',':','q','u','x' };
template <size_t s>
kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
return kj::ArrayPtr<const char>(chars, s - 1).asBytes();
}
void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable,
kj::HttpHeaderId hMyHeader, HttpClient& client) {
kj::HttpHeaders headers(headerTable);
headers.set(hMyHeader, "foo");
auto response = client.openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 101);
KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
KJ_ASSERT(response.webSocketOrBody.is<kj::Own<WebSocket>>());
auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>());
{
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "start-inline");
}
ws->send(kj::StringPtr("bar")).wait(waitScope);
{
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "reply:bar");
}
ws->close(0x1234, "qux").wait(waitScope);
{
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
}
}
inline kj::Promise<void> writeA(kj::AsyncOutputStream& out, kj::ArrayPtr<const byte> data) {
return out.write(data.begin(), data.size());
}
KJ_TEST("HttpClient WebSocket handshake") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
.then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return writeA(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
serverTask.wait(waitScope);
}
KJ_TEST("HttpClient WebSocket error") {
KJ_HTTP_TEST_SETUP_IO;
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); })
.then([&]() { return expectRead(*pipe.ends[1], request); })
.then([&]() { return writeA(*pipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], clientSettings);
kj::HttpHeaders headers(*headerTable);
headers.set(hMyHeader, "foo");
{
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 404);
KJ_EXPECT(response.statusText == "Not Found", response.statusText);
KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
}
{
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 404);
KJ_EXPECT(response.statusText == "Not Found", response.statusText);
KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
}
serverTask.wait(waitScope);
}
KJ_TEST("HttpServer WebSocket handshake") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(timer, *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
writeA(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
writeA(*pipe.ends[1], WEBSOCKET_SEND_CLOSE).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
listenTask.wait(waitScope);
}
KJ_TEST("HttpServer WebSocket handshake error") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(timer, *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE);
writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
// Can send more requests!
writeA(*pipe.ends[1], request.asBytes()).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
pipe.ends[1]->shutdownWrite();
listenTask.wait(waitScope);
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpServer request timeout") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.headerTimeout = 1 * kj::MILLISECONDS;
HttpServer server(timer, table, service, settings);
// Shouldn't hang! Should time out.
auto promise = server.listenHttp(kj::mv(pipe.ends[0]));
KJ_EXPECT(!promise.poll(waitScope));
timer.advanceTo(timer.now() + settings.headerTimeout / 2);
KJ_EXPECT(!promise.poll(waitScope));
timer.advanceTo(timer.now() + settings.headerTimeout);
promise.wait(waitScope);
// Closes the connection without sending anything.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
KJ_TEST("HttpServer pipeline timeout") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.pipelineTimeout = 1 * kj::MILLISECONDS;
HttpServer server(timer, table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(waitScope);
// Listen task should time out even though we didn't shutdown the socket.
KJ_EXPECT(!listenTask.poll(waitScope));
timer.advanceTo(timer.now() + settings.pipelineTimeout / 2);
KJ_EXPECT(!listenTask.poll(waitScope));
timer.advanceTo(timer.now() + settings.pipelineTimeout);
listenTask.wait(waitScope);
// In this case, no data is sent back.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
class BrokenHttpService final: public HttpService {
// HttpService that doesn't send a response.
public:
BrokenHttpService() = default;
explicit BrokenHttpService(kj::Exception&& exception): exception(kj::mv(exception)) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
return requestBody.readAllBytes().then([this](kj::Array<byte>&&) -> kj::Promise<void> {
KJ_IF_MAYBE(e, exception) {
return kj::cp(*e);
} else {
return kj::READY_NOW;
}
});
}
private:
kj::Maybe<kj::Exception> exception;
};
KJ_TEST("HttpServer no response") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 500 Internal Server Error\r\n"
"Connection: close\r\n"
"Content-Length: 51\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"ERROR: The HttpService did not generate a response.", text);
}
KJ_TEST("HttpServer disconnected") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected"));
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text == "", text);
}
KJ_TEST("HttpServer overloaded") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded"));
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text);
}
KJ_TEST("HttpServer unimplemented") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented"));
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text);
}
KJ_TEST("HttpServer threw exception") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text);
}
KJ_TEST("HttpServer bad request") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
static constexpr auto request = "GET / HTTP/1.1\r\nbad request\r\n\r\n"_kj;
auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
auto response = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(writePromise.poll(waitScope));
writePromise.wait(waitScope);
static constexpr auto expectedResponse =
"HTTP/1.1 400 Bad Request\r\n"
"Connection: close\r\n"
"Content-Length: 53\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"ERROR: The headers sent by your client are not valid."_kj;
KJ_EXPECT(expectedResponse == response, expectedResponse, response);
}
KJ_TEST("HttpServer invalid method") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
static constexpr auto request = "bad request\r\n\r\n"_kj;
auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
auto response = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(writePromise.poll(waitScope));
writePromise.wait(waitScope);
static constexpr auto expectedResponse =
"HTTP/1.1 501 Not Implemented\r\n"
"Connection: close\r\n"
"Content-Length: 35\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"ERROR: Unrecognized request method."_kj;
KJ_EXPECT(expectedResponse == response, expectedResponse, response);
}
// Ensure that HttpServerSettings can continue to be constexpr.
KJ_UNUSED static constexpr HttpServerSettings STATIC_CONSTEXPR_SETTINGS {};
class TestErrorHandler: public HttpServerErrorHandler {
public:
kj::Promise<void> handleClientProtocolError(
HttpHeaders::ProtocolError protocolError, kj::HttpService::Response& response) override {
// In a real error handler, you should redact `protocolError.rawContent`.
auto message = kj::str("Saw protocol error: ", protocolError.description, "; rawContent = ",
encodeCEscape(protocolError.rawContent));
return sendError(400, "Bad Request", kj::mv(message), response);
}
kj::Promise<void> handleApplicationError(
kj::Exception exception, kj::Maybe<kj::HttpService::Response&> response) override {
return sendError(500, "Internal Server Error",
kj::str("Saw application error: ", exception.getDescription()), response);
}
kj::Promise<void> handleNoResponse(kj::HttpService::Response& response) override {
return sendError(500, "Internal Server Error", kj::str("Saw no response."), response);
}
static TestErrorHandler instance;
private:
kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, String message,
Maybe<HttpService::Response&> response) {
KJ_IF_MAYBE(r, response) {
HttpHeaderTable headerTable;
HttpHeaders headers(headerTable);
auto body = r->send(statusCode, statusText, headers, message.size());
return body->write(message.begin(), message.size()).attach(kj::mv(body), kj::mv(message));
} else {
KJ_LOG(ERROR, "Saw an error but too late to report to client.");
return kj::READY_NOW;
}
}
};
TestErrorHandler TestErrorHandler::instance {};
KJ_TEST("HttpServer no response, custom error handler") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpServerSettings settings {};
settings.errorHandler = TestErrorHandler::instance;
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(timer, table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 500 Internal Server Error\r\n"
"Connection: close\r\n"
"Content-Length: 16\r\n"
"\r\n"
"Saw no response.", text);
}
KJ_TEST("HttpServer threw exception, custom error handler") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpServerSettings settings {};
settings.errorHandler = TestErrorHandler::instance;
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
HttpServer server(timer, table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 500 Internal Server Error\r\n"
"Connection: close\r\n"
"Content-Length: 29\r\n"
"\r\n"
"Saw application error: failed", text);
}
KJ_TEST("HttpServer bad request, custom error handler") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpServerSettings settings {};
settings.errorHandler = TestErrorHandler::instance;
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(timer, table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
static constexpr auto request = "bad request\r\n\r\n"_kj;
auto writePromise = pipe.ends[1]->write(request.begin(), request.size());
auto response = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(writePromise.poll(waitScope));
writePromise.wait(waitScope);
static constexpr auto expectedResponse =
"HTTP/1.1 400 Bad Request\r\n"
"Connection: close\r\n"
"Content-Length: 80\r\n"
"\r\n"
"Saw protocol error: Unrecognized request method.; "
"rawContent = bad request\\000\\n"_kj;
KJ_EXPECT(expectedResponse == response, expectedResponse, response);
}
class PartialResponseService final: public HttpService {
// HttpService that sends a partial response then throws.
public:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
return requestBody.readAllBytes()
.then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
HttpHeaders headers(table);
auto body = response.send(200, "OK", headers, 32);
auto promise = body->write("foo", 3);
return promise.attach(kj::mv(body)).then([]() -> kj::Promise<void> {
return KJ_EXCEPTION(FAILED, "failed");
});
});
}
private:
kj::Maybe<kj::Exception> exception;
HttpHeaderTable table;
};
KJ_TEST("HttpServer threw exception after starting response") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
PartialResponseService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
KJ_EXPECT_LOG(ERROR, "HttpService threw exception after generating a partial response");
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 200 OK\r\n"
"Content-Length: 32\r\n"
"\r\n"
"foo", text);
}
class PartialResponseNoThrowService final: public HttpService {
// HttpService that sends a partial response then returns without throwing.
public:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
return requestBody.readAllBytes()
.then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
HttpHeaders headers(table);
auto body = response.send(200, "OK", headers, 32);
auto promise = body->write("foo", 3);
return promise.attach(kj::mv(body));
});
}
private:
kj::Maybe<kj::Exception> exception;
HttpHeaderTable table;
};
KJ_TEST("HttpServer failed to write complete response but didn't throw") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
PartialResponseNoThrowService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 200 OK\r\n"
"Content-Length: 32\r\n"
"\r\n"
"foo", text);
}
class SimpleInputStream final: public kj::AsyncInputStream {
// An InputStream that returns bytes out of a static string.
public:
SimpleInputStream(kj::StringPtr text)
: unread(text.asBytes()) {}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
size_t amount = kj::min(maxBytes, unread.size());
memcpy(buffer, unread.begin(), amount);
unread = unread.slice(amount, unread.size());
return amount;
}
private:
kj::ArrayPtr<const byte> unread;
};
class PumpResponseService final: public HttpService {
// HttpService that uses pumpTo() to write a response, without carefully specifying how much to
// pump, but the stream happens to be the right size.
public:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
return requestBody.readAllBytes()
.then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
HttpHeaders headers(table);
kj::StringPtr text = "Hello, World!";
auto body = response.send(200, "OK", headers, text.size());
auto stream = kj::heap<SimpleInputStream>(text);
auto promise = stream->pumpTo(*body);
return promise.attach(kj::mv(body), kj::mv(stream))
.then([text](uint64_t amount) {
KJ_EXPECT(amount == text.size());
});
});
}
private:
kj::Maybe<kj::Exception> exception;
HttpHeaderTable table;
};
KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
PumpResponseService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(waitScope);
pipe.ends[1]->shutdownWrite();
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 200 OK\r\n"
"Content-Length: 13\r\n"
"\r\n"
"Hello, World!", text);
}
class HangingHttpService final: public HttpService {
// HttpService that hangs forever.
public:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
kj::Promise<void> result = kj::NEVER_DONE;
++inFlight;
return result.attach(kj::defer([this]() {
if (--inFlight == 0) {
KJ_IF_MAYBE(f, onCancelFulfiller) {
f->get()->fulfill();
}
}
}));
}
kj::Promise<void> onCancel() {
auto paf = kj::newPromiseAndFulfiller<void>();
onCancelFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
uint inFlight = 0;
private:
kj::Maybe<kj::Exception> exception;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onCancelFulfiller;
};
KJ_TEST("HttpServer cancels request when client disconnects") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
HangingHttpService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
KJ_EXPECT(service.inFlight == 0);
static constexpr auto request = "GET / HTTP/1.1\r\n\r\n"_kj;
pipe.ends[1]->write(request.begin(), request.size()).wait(waitScope);
auto cancelPromise = service.onCancel();
KJ_EXPECT(!cancelPromise.poll(waitScope));
KJ_EXPECT(service.inFlight == 1);
// Disconnect client and verify server cancels.
pipe.ends[1] = nullptr;
KJ_ASSERT(cancelPromise.poll(waitScope));
KJ_EXPECT(service.inFlight == 0);
cancelPromise.wait(waitScope);
}
class SuspendAfter: private HttpService {
// A SuspendableHttpServiceFactory which responds to the first `n` requests with 200 OK, then
// suspends all subsequent requests until its counter is reset.
public:
void suspendAfter(uint countdownParam) { countdown = countdownParam; }
kj::Maybe<kj::Own<HttpService>> operator()(HttpServer::SuspendableRequest& sr) {
if (countdown == 0) {
suspendedRequest = sr.suspend();
return nullptr;
}
--countdown;
return kj::Own<HttpService>(static_cast<HttpService*>(this), kj::NullDisposer::instance);
}
kj::Maybe<HttpServer::SuspendedRequest> getSuspended() {
KJ_DEFER(suspendedRequest = nullptr);
return kj::mv(suspendedRequest);
}
private:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
HttpHeaders responseHeaders(table);
response.send(200, "OK", responseHeaders);
return requestBody.readAllBytes().ignoreResult();
}
HttpHeaderTable table;
uint countdown = kj::maxValue;
kj::Maybe<HttpServer::SuspendedRequest> suspendedRequest;
};
KJ_TEST("HttpServer can suspend a request") {
// This test sends a single request to an HttpServer three times. First it writes the request to
// its pipe and arranges for the HttpServer to suspend the request. Then it resumes the suspended
// request and arranges for this resumption to be suspended as well. Then it resumes once more and
// arranges for the request to be completed.
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
// This HttpService will not actually be used, because we're passing a factory in to
// listenHttpCleanDrain().
HangingHttpService service;
HttpServer server(timer, table, service);
kj::Maybe<HttpServer::SuspendedRequest> suspendedRequest;
SuspendAfter factory;
{
// Observe the HttpServer suspend.
factory.suspendAfter(0);
auto listenPromise = server.listenHttpCleanDrain(*pipe.ends[0], factory);
static constexpr kj::StringPtr REQUEST =
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"foobar\r\n"
"0\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
// The listen promise is fulfilled with false.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(!listenPromise.wait(waitScope));
// And we have a SuspendedRequest.
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest != nullptr);
}
{
// Observe the HttpServer suspend again without reading from the connection.
factory.suspendAfter(0);
auto listenPromise = server.listenHttpCleanDrain(
*pipe.ends[0], factory, kj::mv(suspendedRequest));
// The listen promise is again fulfilled with false.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(!listenPromise.wait(waitScope));
// We again have a suspendedRequest.
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest != nullptr);
}
{
// The SuspendedRequest is completed.
factory.suspendAfter(1);
auto listenPromise = server.listenHttpCleanDrain(
*pipe.ends[0], factory, kj::mv(suspendedRequest));
auto drainPromise = kj::evalLast([&]() {
return server.drain();
});
// We need to read the response for the HttpServer to drain.
auto readPromise = pipe.ends[1]->readAllText();
// This time, the server drained cleanly.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(listenPromise.wait(waitScope));
drainPromise.wait(waitScope);
// Close the server side of the pipe so our read promise completes.
pipe.ends[0] = nullptr;
auto response = readPromise.wait(waitScope);
static constexpr kj::StringPtr RESPONSE =
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n"_kj;
KJ_EXPECT(RESPONSE == response);
}
}
KJ_TEST("HttpServer can suspend and resume pipelined requests") {
// This test sends multiple requests with both Content-Length and Transfer-Encoding: chunked
// bodies, and verifies that suspending both kinds does not corrupt the stream.
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
// This HttpService will not actually be used, because we're passing a factory in to
// listenHttpCleanDrain().
HangingHttpService service;
HttpServer server(timer, table, service);
// We'll suspend the second request.
kj::Maybe<HttpServer::SuspendedRequest> suspendedRequest;
SuspendAfter factory;
static constexpr kj::StringPtr LENGTHFUL_REQUEST =
"POST / HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"\r\n"
"foobar"_kj;
static constexpr kj::StringPtr CHUNKED_REQUEST =
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"foobar\r\n"
"0\r\n"
"\r\n"_kj;
// Set up several requests; we'll suspend and transfer the second and third one.
auto writePromise = pipe.ends[1]->write(LENGTHFUL_REQUEST.begin(), LENGTHFUL_REQUEST.size())
.then([&]() {
return pipe.ends[1]->write(CHUNKED_REQUEST.begin(), CHUNKED_REQUEST.size());
}).then([&]() {
return pipe.ends[1]->write(LENGTHFUL_REQUEST.begin(), LENGTHFUL_REQUEST.size());
}).then([&]() {
return pipe.ends[1]->write(CHUNKED_REQUEST.begin(), CHUNKED_REQUEST.size());
});
auto readPromise = pipe.ends[1]->readAllText();
{
// Observe the HttpServer suspend the second request.
factory.suspendAfter(1);
auto listenPromise = server.listenHttpCleanDrain(*pipe.ends[0], factory);
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(!listenPromise.wait(waitScope));
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest != nullptr);
}
{
// Let's resume one request and suspend the next pipelined request.
factory.suspendAfter(1);
auto listenPromise = server.listenHttpCleanDrain(
*pipe.ends[0], factory, kj::mv(suspendedRequest));
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(!listenPromise.wait(waitScope));
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest != nullptr);
}
{
// Resume again and run to completion.
factory.suspendAfter(kj::maxValue);
auto listenPromise = server.listenHttpCleanDrain(
*pipe.ends[0], factory, kj::mv(suspendedRequest));
auto drainPromise = kj::evalLast([&]() {
return server.drain();
});
// This time, the server drained cleanly.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(listenPromise.wait(waitScope));
// No suspended request this time.
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest == nullptr);
drainPromise.wait(waitScope);
}
writePromise.wait(waitScope);
// Close the server side of the pipe so our read promise completes.
pipe.ends[0] = nullptr;
auto responses = readPromise.wait(waitScope);
static constexpr kj::StringPtr RESPONSE =
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n"_kj;
KJ_EXPECT(kj::str(kj::delimited(kj::repeat(RESPONSE, 4), "")) == responses);
}
KJ_TEST("HttpServer can suspend a request with no leftover") {
// This test verifies that if the request loop's read perfectly ends at the end of message
// headers, leaving no leftover section, we can still successfully suspend and resume.
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
// This HttpService will not actually be used, because we're passing a factory in to
// listenHttpCleanDrain().
HangingHttpService service;
HttpServer server(timer, table, service);
kj::Maybe<HttpServer::SuspendedRequest> suspendedRequest;
SuspendAfter factory;
{
factory.suspendAfter(0);
auto listenPromise = server.listenHttpCleanDrain(*pipe.ends[0], factory);
static constexpr kj::StringPtr REQUEST_HEADERS =
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST_HEADERS.begin(), REQUEST_HEADERS.size()).wait(waitScope);
// The listen promise is fulfilled with false.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(!listenPromise.wait(waitScope));
// And we have a SuspendedRequest. We know that it has no leftover, because we only wrote
// headers, no body yet.
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest != nullptr);
}
{
factory.suspendAfter(1);
auto listenPromise = server.listenHttpCleanDrain(
*pipe.ends[0], factory, kj::mv(suspendedRequest));
auto drainPromise = kj::evalLast([&]() {
return server.drain();
});
// We need to read the response for the HttpServer to drain.
auto readPromise = pipe.ends[1]->readAllText();
static constexpr kj::StringPtr REQUEST_BODY =
"6\r\n"
"foobar\r\n"
"0\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST_BODY.begin(), REQUEST_BODY.size()).wait(waitScope);
// Clean drain.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(listenPromise.wait(waitScope));
drainPromise.wait(waitScope);
// No SuspendedRequest.
suspendedRequest = factory.getSuspended();
KJ_EXPECT(suspendedRequest == nullptr);
// Close the server side of the pipe so our read promise completes.
pipe.ends[0] = nullptr;
auto response = readPromise.wait(waitScope);
static constexpr kj::StringPtr RESPONSE =
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n"_kj;
KJ_EXPECT(RESPONSE == response);
}
}
KJ_TEST("HttpServer::listenHttpCleanDrain() factory-created services outlive requests") {
// Test that the lifetimes of factory-created Own<HttpService> objects are handled correctly.
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
// This HttpService will not actually be used, because we're passing a factory in to
// listenHttpCleanDrain().
HangingHttpService service;
HttpServer server(timer, table, service);
uint serviceCount = 0;
// A factory which returns a service whose request() function responds asynchronously.
auto factory = [&](HttpServer::SuspendableRequest&) -> kj::Own<HttpService> {
class ServiceImpl final: public HttpService {
public:
explicit ServiceImpl(uint& serviceCount): serviceCount(++serviceCount) {}
~ServiceImpl() noexcept(false) { --serviceCount; }
KJ_DISALLOW_COPY(ServiceImpl);
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
return evalLater([&serviceCount = serviceCount, &table = table, &requestBody, &response]() {
// This KJ_EXPECT here is the entire point of this test.
KJ_EXPECT(serviceCount == 1)
HttpHeaders responseHeaders(table);
response.send(200, "OK", responseHeaders);
return requestBody.readAllBytes().ignoreResult();
});
}
private:
HttpHeaderTable table;
uint& serviceCount;
};
return kj::heap<ServiceImpl>(serviceCount);
};
auto listenPromise = server.listenHttpCleanDrain(*pipe.ends[0], factory);
static constexpr kj::StringPtr REQUEST =
"POST / HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"\r\n"
"foobar"_kj;
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
// We need to read the response for the HttpServer to drain.
auto readPromise = pipe.ends[1]->readAllText();
// http-socketpair-test quirk: we must drive the request loop past the point of receiving request
// headers so that our call to server.drain() doesn't prematurely cancel the request.
KJ_EXPECT(!listenPromise.poll(waitScope));
auto drainPromise = kj::evalLast([&]() {
return server.drain();
});
// Clean drain.
KJ_EXPECT(listenPromise.poll(waitScope));
KJ_EXPECT(listenPromise.wait(waitScope));
drainPromise.wait(waitScope);
// Close the server side of the pipe so our read promise completes.
pipe.ends[0] = nullptr;
auto response = readPromise.wait(waitScope);
static constexpr kj::StringPtr RESPONSE =
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n"_kj;
KJ_EXPECT(RESPONSE == response);
}
// -----------------------------------------------------------------------------
KJ_TEST("newHttpService from HttpClient") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
writeResponsesPromise = writeResponsesPromise
.then([&]() {
return expectRead(*backPipe.ends[1], testCase.request.raw);
}).then([&]() {
return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
});
}
{
HttpHeaderTable table;
auto backClient = newHttpClient(table, *backPipe.ends[0]);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(waitScope);
expectRead(*frontPipe.ends[0], testCase.response.raw).wait(waitScope);
}
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(waitScope);
}
backPipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("newHttpService from HttpClient WebSockets") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
.then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
.then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_MESSAGE); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_REPLY_CLOSE); })
.then([&]() { return expectEnd(*backPipe.ends[1]); })
.then([&]() { backPipe.ends[1]->shutdownWrite(); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
{
HttpHeaderTable table;
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto backClientStream = kj::mv(backPipe.ends[0]);
auto backClient = newHttpClient(table, *backClientStream, clientSettings);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
writeA(*frontPipe.ends[0], WEBSOCKET_SEND_CLOSE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(waitScope);
}
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto backPipe = KJ_HTTP_TEST_CREATE_2PIPE;
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
.then([&]() { return writeA(*backPipe.ends[1], asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)); })
.then([&]() { return writeA(*backPipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { backPipe.ends[1]->shutdownWrite(); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
{
HttpHeaderTable table;
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto backClient = newHttpClient(table, *backPipe.ends[0], clientSettings);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
writeA(*frontPipe.ends[0], request.asBytes()).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
writeA(*frontPipe.ends[0], WEBSOCKET_SEND_MESSAGE).wait(waitScope);
KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(waitScope) == "");
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(waitScope);
}
writeResponsesPromise.wait(waitScope);
}
// -----------------------------------------------------------------------------
KJ_TEST("newHttpClient from HttpService") {
auto PIPELINE_TESTS = pipelineTestCases();
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
auto client = newHttpClient(service);
for (auto& testCase: PIPELINE_TESTS) {
testHttpClient(waitScope, table, *client, testCase);
}
}
KJ_TEST("newHttpClient from HttpService WebSockets") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
auto client = newHttpClient(service);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
}
KJ_TEST("adapted client/server propagates request exceptions like non-adapted client") {
KJ_HTTP_TEST_SETUP_IO;
HttpHeaderTable table;
HttpHeaders headers(table);
class FailingHttpClient final: public HttpClient {
public:
Request request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_FAIL_ASSERT("request_fail");
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
KJ_FAIL_ASSERT("websocket_fail");
}
};
auto rawClient = kj::heap<FailingHttpClient>();
auto innerClient = kj::heap<FailingHttpClient>();
auto adaptedService = kj::newHttpService(*innerClient).attach(kj::mv(innerClient));
auto adaptedClient = kj::newHttpClient(*adaptedService).attach(kj::mv(adaptedService));
KJ_EXPECT_THROW_MESSAGE("request_fail", rawClient->request(HttpMethod::POST, "/"_kj, headers));
KJ_EXPECT_THROW_MESSAGE("request_fail", adaptedClient->request(HttpMethod::POST, "/"_kj, headers));
KJ_EXPECT_THROW_MESSAGE("websocket_fail", rawClient->openWebSocket("/"_kj, headers));
KJ_EXPECT_THROW_MESSAGE("websocket_fail", adaptedClient->openWebSocket("/"_kj, headers));
}
class DelayedCompletionHttpService final: public HttpService {
public:
DelayedCompletionHttpService(HttpHeaderTable& table, kj::Maybe<uint64_t> expectedLength)
: table(table), expectedLength(expectedLength) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
auto stream = response.send(200, "OK", HttpHeaders(table), expectedLength);
auto promise = stream->write("foo", 3);
return promise.attach(kj::mv(stream)).then([this]() {
return kj::mv(paf.promise);
});
}
kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; }
private:
HttpHeaderTable& table;
kj::Maybe<uint64_t> expectedLength;
kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>();
};
void doDelayedCompletionTest(bool exception, kj::Maybe<uint64_t> expectedLength) noexcept {
KJ_HTTP_TEST_SETUP_IO;
HttpHeaderTable table;
DelayedCompletionHttpService service(table, expectedLength);
auto client = newHttpClient(service);
auto resp = client->request(HttpMethod::GET, "/", HttpHeaders(table), uint64_t(0))
.response.wait(waitScope);
KJ_EXPECT(resp.statusCode == 200);
// Read "foo" from the response body: works
char buffer[16];
KJ_ASSERT(resp.body->tryRead(buffer, 1, sizeof(buffer)).wait(waitScope) == 3);
buffer[3] = '\0';
KJ_EXPECT(buffer == "foo"_kj);
// But reading any more hangs.
auto promise = resp.body->tryRead(buffer, 1, sizeof(buffer));
KJ_EXPECT(!promise.poll(waitScope));
// Until we cause the service to return.
if (exception) {
service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure"));
} else {
service.getFulfiller().fulfill();
}
KJ_ASSERT(promise.poll(waitScope));
if (exception) {
KJ_EXPECT_THROW_MESSAGE("service-side failure", promise.wait(waitScope));
} else {
promise.wait(waitScope);
}
};
KJ_TEST("adapted client waits for service to complete before returning EOF on response stream") {
doDelayedCompletionTest(false, uint64_t(3));
}
KJ_TEST("adapted client waits for service to complete before returning EOF on chunked response") {
doDelayedCompletionTest(false, nullptr);
}
KJ_TEST("adapted client propagates throw from service after complete response body sent") {
doDelayedCompletionTest(true, uint64_t(3));
}
KJ_TEST("adapted client propagates throw from service after incomplete response body sent") {
doDelayedCompletionTest(true, uint64_t(6));
}
KJ_TEST("adapted client propagates throw from service after chunked response body sent") {
doDelayedCompletionTest(true, nullptr);
}
class DelayedCompletionWebSocketHttpService final: public HttpService {
public:
DelayedCompletionWebSocketHttpService(HttpHeaderTable& table, bool closeUpstreamFirst)
: table(table), closeUpstreamFirst(closeUpstreamFirst) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
KJ_ASSERT(headers.isWebSocket());
auto ws = response.acceptWebSocket(HttpHeaders(table));
kj::Promise<void> promise = kj::READY_NOW;
if (closeUpstreamFirst) {
// Wait for a close message from the client before starting.
promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult();
}
promise = promise
.then([&ws = *ws]() { return ws.send("foo"_kj); })
.then([&ws = *ws]() { return ws.close(1234, "closed"_kj); });
if (!closeUpstreamFirst) {
// Wait for a close message from the client at the end.
promise = promise.then([&ws = *ws]() { return ws.receive(); }).ignoreResult();
}
return promise.attach(kj::mv(ws)).then([this]() {
return kj::mv(paf.promise);
});
}
kj::PromiseFulfiller<void>& getFulfiller() { return *paf.fulfiller; }
private:
HttpHeaderTable& table;
bool closeUpstreamFirst;
kj::PromiseFulfillerPair<void> paf = kj::newPromiseAndFulfiller<void>();
};
void doDelayedCompletionWebSocketTest(bool exception, bool closeUpstreamFirst) noexcept {
KJ_HTTP_TEST_SETUP_IO;
HttpHeaderTable table;
DelayedCompletionWebSocketHttpService service(table, closeUpstreamFirst);
auto client = newHttpClient(service);
auto resp = client->openWebSocket("/", HttpHeaders(table)).wait(waitScope);
auto ws = kj::mv(KJ_ASSERT_NONNULL(resp.webSocketOrBody.tryGet<kj::Own<WebSocket>>()));
if (closeUpstreamFirst) {
// Send "close" immediately.
ws->close(1234, "whatever"_kj).wait(waitScope);
}
// Read "foo" from the WebSocket: works
{
auto msg = ws->receive().wait(waitScope);
KJ_ASSERT(msg.is<kj::String>());
KJ_ASSERT(msg.get<kj::String>() == "foo");
}
kj::Promise<void> promise = nullptr;
if (closeUpstreamFirst) {
// Receiving the close hangs.
promise = ws->receive()
.then([](WebSocket::Message&& msg) { KJ_EXPECT(msg.is<WebSocket::Close>()); });
} else {
auto msg = ws->receive().wait(waitScope);
KJ_ASSERT(msg.is<WebSocket::Close>());
// Sending a close hangs.
promise = ws->close(1234, "whatever"_kj);
}
KJ_EXPECT(!promise.poll(waitScope));
// Until we cause the service to return.
if (exception) {
service.getFulfiller().reject(KJ_EXCEPTION(FAILED, "service-side failure"));
} else {
service.getFulfiller().fulfill();
}
KJ_ASSERT(promise.poll(waitScope));
if (exception) {
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("service-side failure", promise.wait(waitScope));
} else {
promise.wait(waitScope);
}
};
KJ_TEST("adapted client waits for service to complete before completing upstream close on WebSocket") {
doDelayedCompletionWebSocketTest(false, false);
}
KJ_TEST("adapted client waits for service to complete before returning downstream close on WebSocket") {
doDelayedCompletionWebSocketTest(false, true);
}
KJ_TEST("adapted client propagates throw from service after WebSocket upstream close sent") {
doDelayedCompletionWebSocketTest(true, false);
}
KJ_TEST("adapted client propagates throw from service after WebSocket downstream close sent") {
doDelayedCompletionWebSocketTest(true, true);
}
// -----------------------------------------------------------------------------
class CountingIoStream final: public kj::AsyncIoStream {
// An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
// many connections are open).
public:
CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count)
: inner(kj::mv(inner)), count(count) {}
~CountingIoStream() noexcept(false) {
--count;
}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();;
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return inner->tryPumpFrom(input, amount);
}
Promise<void> whenWriteDisconnected() override {
return inner->whenWriteDisconnected();
}
void shutdownWrite() override {
return inner->shutdownWrite();
}
void abortRead() override {
return inner->abortRead();
}
public:
kj::Own<AsyncIoStream> inner;
uint& count;
};
class CountingNetworkAddress final: public kj::NetworkAddress {
public:
CountingNetworkAddress(kj::NetworkAddress& inner, uint& count, uint& cumulative)
: inner(inner), count(count), addrCount(ownAddrCount), cumulative(cumulative) {}
CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
: inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount),
cumulative(ownCumulative) {}
~CountingNetworkAddress() noexcept(false) {
--addrCount;
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
++count;
++cumulative;
return inner.connect()
.then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
return kj::heap<CountingIoStream>(kj::mv(stream), count);
});
}
kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
kj::String toString() override { KJ_UNIMPLEMENTED("test"); }
private:
kj::NetworkAddress& inner;
kj::Own<kj::NetworkAddress> ownInner;
uint& count;
uint ownAddrCount = 1;
uint& addrCount;
uint ownCumulative = 0;
uint& cumulative;
};
class ConnectionCountingNetwork final: public kj::Network {
public:
ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount)
: inner(inner), count(count), addrCount(addrCount) {}
Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
++addrCount;
return inner.parseAddress(addr, portHint)
.then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> {
return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount);
});
}
Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
KJ_UNIMPLEMENTED("test");
}
Own<Network> restrictPeers(
kj::ArrayPtr<const kj::StringPtr> allow,
kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
KJ_UNIMPLEMENTED("test");
}
private:
kj::Network& inner;
uint& count;
uint& addrCount;
};
class DummyService final: public HttpService {
public:
DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
if (!headers.isWebSocket()) {
if (url == "/throw") {
return KJ_EXCEPTION(FAILED, "client requested failure");
}
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(stream->write(body.begin(), body.size()));
promises.add(requestBody.readAllBytes().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body));
} else {
auto ws = response.acceptWebSocket(HttpHeaders(headerTable));
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto sendPromise = ws->send(body);
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(sendPromise.attach(kj::mv(body)));
promises.add(ws->receive().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(ws));
}
}
private:
HttpHeaderTable& headerTable;
};
KJ_TEST("HttpClient connection management") {
KJ_HTTP_TEST_SETUP_IO;
KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
uint count = 0;
uint cumulative = 0;
CountingNetworkAddress countingAddr(*addr, count, cumulative);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// We can do several requests in a row and only have one connection.
doRequest().wait(waitScope);
doRequest().wait(waitScope);
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 1);
// But if we do two in parallel, we'll end up with two connections.
auto req1 = doRequest();
auto req2 = doRequest();
req1.wait(waitScope);
req2.wait(waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// We can reuse after a POST, provided we write the whole POST body properly.
{
auto req = client->request(
HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6));
req.body->write("foobar", 6).wait(waitScope);
req.response.wait(waitScope).body->readAllBytes().wait(waitScope);
}
KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
doRequest().wait(waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2);
doRequest().wait(waitScope);
doRequest().wait(waitScope);
waitScope.poll();
KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// Advance time past when the other connection should time out. It should be dropped.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 3 / 4);
waitScope.poll();
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 2);
// Wait for the other to drop.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout / 2);
waitScope.poll();
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 2);
// New request creates a new connection again.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 3);
// WebSocket connections are not reused.
client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
.wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 3);
// Errored connections are not reused.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 4);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(waitScope).body->readAllBytes().wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 4);
// Connections where we failed to read the full response body are not reused.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 5);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
.wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 5);
// Connections where we didn't even wait for the response headers are not reused.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 6);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable));
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 6);
// Connections where we failed to write the full request body are not reused.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 7);
client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
.wait(waitScope).body->readAllBytes().wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 7);
// If the server times out the connection, we figure it out on the client.
doRequest().wait(waitScope);
// TODO(someday): Figure out why the following poll is necessary for the test to pass on Windows
// and Mac. Without it, it seems that the request's connection never starts, so the
// subsequent advanceTo() does not actually time out the connection.
waitScope.poll();
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 8);
serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
waitScope.poll();
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 8);
// Can still make requests.
doRequest().wait(waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 9);
}
KJ_TEST("HttpClient disable connection reuse") {
KJ_HTTP_TEST_SETUP_IO;
KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
uint count = 0;
uint cumulative = 0;
CountingNetworkAddress countingAddr(*addr, count, cumulative);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
clientSettings.idleTimeout = 0 * kj::SECONDS;
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// Each serial request gets its own connection.
doRequest().wait(waitScope);
doRequest().wait(waitScope);
doRequest().wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 3);
// Each parallel request gets its own connection.
auto req1 = doRequest();
auto req2 = doRequest();
req1.wait(waitScope);
req2.wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 5);
}
KJ_TEST("HttpClient concurrency limiting") {
#if KJ_HTTP_TEST_USE_OS_PIPE && !__linux__
// On Windows and Mac, OS event delivery is not always immediate, and that seems to make this
// test flakey. On Linux, events are always immediately delivered. For now, we compile the test
// but we don't run it outside of Linux. We do run the in-memory-pipes version on all OSs since
// that mode shouldn't depend on kernel behavior at all.
return;
#endif
KJ_HTTP_TEST_SETUP_IO;
KJ_HTTP_TEST_SETUP_LOOPBACK_LISTENER_AND_ADDR;
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
uint count = 0;
uint cumulative = 0;
CountingNetworkAddress countingAddr(*addr, count, cumulative);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
clientSettings.idleTimeout = 0 * kj::SECONDS;
auto innerClient = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
struct CallbackEvent {
uint runningCount;
uint pendingCount;
bool operator==(const CallbackEvent& other) const {
return runningCount == other.runningCount && pendingCount == other.pendingCount;
}
bool operator!=(const CallbackEvent& other) const { return !(*this == other); }
// TODO(someday): Can use default spaceship operator in C++20:
//auto operator<=>(const CallbackEvent&) const = default;
};
kj::Vector<CallbackEvent> callbackEvents;
auto callback = [&](uint runningCount, uint pendingCount) {
callbackEvents.add(CallbackEvent{runningCount, pendingCount});
};
auto client = newConcurrencyLimitingHttpClient(*innerClient, 1, kj::mv(callback));
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// Second connection blocked by first.
auto req1 = doRequest();
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
callbackEvents.clear();
auto req2 = doRequest();
// TODO(someday): Figure out why this poll() is necessary on Windows and macOS.
waitScope.poll();
KJ_EXPECT(req1.poll(waitScope));
KJ_EXPECT(!req2.poll(waitScope));
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 1);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} }));
callbackEvents.clear();
// Releasing first connection allows second to start.
req1.wait(waitScope);
KJ_EXPECT(req2.poll(waitScope));
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 2);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
callbackEvents.clear();
req2.wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 2);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} }));
callbackEvents.clear();
// Using body stream after releasing blocked response promise throws no exception
auto req3 = doRequest();
{
kj::Own<kj::AsyncOutputStream> req4Body;
{
auto req4 = client->request(HttpMethod::GET, kj::str("/", ++i), HttpHeaders(headerTable));
waitScope.poll();
req4Body = kj::mv(req4.body);
}
auto writePromise = req4Body->write("a", 1);
KJ_EXPECT(!writePromise.poll(waitScope));
}
req3.wait(waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 3);
// Similar connection limiting for web sockets
// TODO(someday): Figure out why the sequencing of websockets events does
// not work correctly on Windows (and maybe macOS?). The solution is not as
// simple as inserting poll()s as above, since doing so puts the websocket in
// a state that trips a "previous HTTP message body incomplete" assertion,
// while trying to write 500 network response.
callbackEvents.clear();
auto ws1 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)));
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
callbackEvents.clear();
auto ws2 = kj::heap(client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)));
KJ_EXPECT(ws1->poll(waitScope));
KJ_EXPECT(!ws2->poll(waitScope));
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 4);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 1} }));
callbackEvents.clear();
{
auto response1 = ws1->wait(waitScope);
KJ_EXPECT(!ws2->poll(waitScope));
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({}));
}
KJ_EXPECT(ws2->poll(waitScope));
KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 5);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {1, 0} }));
callbackEvents.clear();
{
auto response2 = ws2->wait(waitScope);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({}));
}
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 5);
KJ_EXPECT(callbackEvents == kj::ArrayPtr<const CallbackEvent>({ {0, 0} }));
}
#if KJ_HTTP_TEST_USE_OS_PIPE
// TODO(someday): Implement mock kj::Network for userspace version of this test?
KJ_TEST("HttpClient multi host") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServer server(serverTimer, headerTable, service);
auto listenTask1 = server.listenHttp(*listener1);
auto listenTask2 = server.listenHttp(*listener2);
uint count = 0, addrCount = 0;
uint tlsCount = 0, tlsAddrCount = 0;
ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount);
ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount);
HttpClientSettings clientSettings;
auto client = newHttpClient(clientTimer, headerTable,
countingNetwork, countingTlsNetwork, clientSettings);
KJ_EXPECT(count == 0);
uint i = 0;
auto doRequest = [&](bool tls, uint port) {
uint n = i++;
// We stick a double-slash in the URL to test that it doesn't get coalesced into one slash,
// which was a bug in the past.
return client->request(HttpMethod::GET,
kj::str((tls ? "https://localhost:" : "http://localhost:"), port, "//", n),
HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n, port](kj::String body) {
KJ_EXPECT(body == kj::str("localhost:", port, "://", n), body, port, n);
});
};
uint port1 = listener1->getPort();
uint port2 = listener2->getPort();
// We can do several requests in a row to the same host and only have one connection.
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
// Request a different host, and now we have two connections.
doRequest(false, port2).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 0);
// Try TLS.
doRequest(true, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Try first host again, no change in connection count.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Multiple requests in parallel forces more connections to that host.
auto promise1 = doRequest(false, port1);
auto promise2 = doRequest(false, port1);
promise1.wait(io.waitScope);
promise2.wait(io.waitScope);
KJ_EXPECT(count == 3);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Let everything expire.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimeout * 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 0);
KJ_EXPECT(tlsAddrCount == 0);
// We can still request those hosts again.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
}
#endif
// -----------------------------------------------------------------------------
#if KJ_HTTP_TEST_USE_OS_PIPE
// This test only makes sense using the real network.
KJ_TEST("HttpClient to capnproto.org") {
auto io = kj::setupAsyncIo();
auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80)
.then([](kj::Own<kj::NetworkAddress> addr) {
auto promise = addr->connect();
return promise.attach(kj::mv(addr));
}).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
return kj::mv(connection);
}, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org");
return nullptr;
}).wait(io.waitScope);
KJ_IF_MAYBE(conn, maybeConn) {
// Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to
// HTTPS, because what kind of horrible web site would serve in plaintext, really?
HttpHeaderTable table;
auto client = newHttpClient(table, **conn);
HttpHeaders headers(table);
headers.set(HttpHeaderId::HOST, "capnproto.org");
auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope);
KJ_EXPECT(response.statusCode / 100 == 3);
auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION));
KJ_EXPECT(location == "https://capnproto.org/");
auto body = response.body->readAllText().wait(io.waitScope);
}
}
#endif
// =======================================================================================
// Misc bugfix tests
class ReadCancelHttpService final: public HttpService {
// HttpService that tries to read all request data but cancels after 1ms and sends a response.
public:
ReadCancelHttpService(kj::Timer& timer, HttpHeaderTable& headerTable)
: timer(timer), headerTable(headerTable) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
if (method == HttpMethod::POST) {
// Try to read all content, but cancel after 1ms.
return requestBody.readAllBytes().ignoreResult()
.exclusiveJoin(timer.afterDelay(1 * kj::MILLISECONDS))
.then([this, &responseSender]() {
responseSender.send(408, "Request Timeout", kj::HttpHeaders(headerTable), uint64_t(0));
});
} else {
responseSender.send(200, "OK", kj::HttpHeaders(headerTable), uint64_t(0));
return kj::READY_NOW;
}
}
private:
kj::Timer& timer;
HttpHeaderTable& headerTable;
};
KJ_TEST("canceling a length stream mid-read correctly discards rest of request") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
ReadCancelHttpService service(timer, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
{
static constexpr kj::StringPtr REQUEST =
"POST / HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"\r\n"
"fooba"_kj; // incomplete
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
auto promise = expectRead(*pipe.ends[1],
"HTTP/1.1 408 Request Timeout\r\n"
"Content-Length: 0\r\n"
"\r\n"_kj);
KJ_EXPECT(!promise.poll(waitScope));
// Trigger timeout, then response should be sent.
timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS);
KJ_ASSERT(promise.poll(waitScope));
promise.wait(waitScope);
}
// We left our request stream hanging. The server will try to read and discard the request body.
// Let's give it the rest of the data, followed by a second request.
{
static constexpr kj::StringPtr REQUEST =
"r"
"GET / HTTP/1.1\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
auto promise = expectRead(*pipe.ends[1],
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n"_kj);
KJ_ASSERT(promise.poll(waitScope));
promise.wait(waitScope);
}
}
KJ_TEST("canceling a chunked stream mid-read correctly discards rest of request") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
ReadCancelHttpService service(timer, table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
{
static constexpr kj::StringPtr REQUEST =
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"fooba"_kj; // incomplete chunk
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
auto promise = expectRead(*pipe.ends[1],
"HTTP/1.1 408 Request Timeout\r\n"
"Content-Length: 0\r\n"
"\r\n"_kj);
KJ_EXPECT(!promise.poll(waitScope));
// Trigger timeout, then response should be sent.
timer.advanceTo(timer.now() + 1 * kj::MILLISECONDS);
KJ_ASSERT(promise.poll(waitScope));
promise.wait(waitScope);
}
// We left our request stream hanging. The server will try to read and discard the request body.
// Let's give it the rest of the data, followed by a second request.
{
static constexpr kj::StringPtr REQUEST =
"r\r\n"
"4a\r\n"
"this is some text that is the body of a chunk and not a valid chunk header\r\n"
"0\r\n"
"\r\n"
"GET / HTTP/1.1\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
auto promise = expectRead(*pipe.ends[1],
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n"_kj);
KJ_ASSERT(promise.poll(waitScope));
promise.wait(waitScope);
}
}
KJ_TEST("drain() doesn't lose bytes when called at the wrong moment") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = KJ_HTTP_TEST_CREATE_2PIPE;
HttpHeaderTable table;
DummyService service(table);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttpCleanDrain(*pipe.ends[0]);
// Do a regular request.
static constexpr kj::StringPtr REQUEST =
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST.begin(), REQUEST.size()).wait(waitScope);
expectRead(*pipe.ends[1],
"HTTP/1.1 200 OK\r\n"
"Content-Length: 13\r\n"
"\r\n"
"example.com:/"_kj).wait(waitScope);
// Make sure the server is blocked on the next read from the socket.
kj::Promise<void>(kj::NEVER_DONE).poll(waitScope);
// Now simultaneously deliver a new request AND drain the socket.
auto drainPromise = server.drain();
static constexpr kj::StringPtr REQUEST2 =
"GET /foo HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj;
pipe.ends[1]->write(REQUEST2.begin(), REQUEST2.size()).wait(waitScope);
#if KJ_HTTP_TEST_USE_OS_PIPE
// In the case of an OS pipe, the drain will complete before any data is read from the socket.
drainPromise.wait(waitScope);
// The HTTP server should indicate the connection was released but still valid.
KJ_ASSERT(listenTask.wait(waitScope));
// The request will not have been read off the socket. We can read it now.
pipe.ends[1]->shutdownWrite();
KJ_EXPECT(pipe.ends[0]->readAllText().wait(waitScope) == REQUEST2);
#else
// In the case of an in-memory pipe, the write() will have delivered bytes directly to the
// destination buffer synchronously, which means that the server must handle the request
// before draining.
KJ_EXPECT(!drainPromise.poll(waitScope));
// The HTTP request should get a response.
expectRead(*pipe.ends[1],
"HTTP/1.1 200 OK\r\n"
"Content-Length: 16\r\n"
"\r\n"
"example.com:/foo"_kj).wait(waitScope);
// Now the drain completes.
drainPromise.wait(waitScope);
// The HTTP server should indicate the connection was released but still valid.
KJ_ASSERT(listenTask.wait(waitScope));
#endif
}
class BrokenConnectionListener final: public kj::ConnectionReceiver {
public:
void fulfillOne(kj::Own<kj::AsyncIoStream> stream) {
fulfiller->fulfill(kj::mv(stream));
}
kj::Promise<kj::Own<kj::AsyncIoStream>> accept() override {
auto paf = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncIoStream>>();
fulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
uint getPort() override {
KJ_UNIMPLEMENTED("not used");
}
private:
kj::Own<kj::PromiseFulfiller<kj::Own<kj::AsyncIoStream>>> fulfiller;
};
class BrokenConnection final: public kj::AsyncIoStream {
public:
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return KJ_EXCEPTION(FAILED, "broken");
}
Promise<void> write(const void* buffer, size_t size) override {
return KJ_EXCEPTION(FAILED, "broken");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return KJ_EXCEPTION(FAILED, "broken");
}
Promise<void> whenWriteDisconnected() override {
return kj::NEVER_DONE;
}
void shutdownWrite() override {}
};
KJ_TEST("HttpServer.listenHttp() doesn't prematurely terminate if an accepted connection is broken") {
KJ_HTTP_TEST_SETUP_IO;
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
HttpHeaderTable table;
DummyService service(table);
HttpServer server(timer, table, service);
BrokenConnectionListener listener;
auto promise = server.listenHttp(listener).eagerlyEvaluate(nullptr);
// Loop is waiting for a connection.
KJ_ASSERT(!promise.poll(waitScope));
KJ_EXPECT_LOG(ERROR, "failed: broken");
listener.fulfillOne(kj::heap<BrokenConnection>());
// The loop should not have stopped, even though the connection was broken.
KJ_ASSERT(!promise.poll(waitScope));
}
} // namespace
} // namespace kj