blob: ba0ee88b8e2d5e4e67c15b8de09c2af4d44434b0 [file] [log] [blame]
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
// Request Vista-level APIs.
#include "win32-api-version.h"
#include "async-win32.h"
#include "debug.h"
#include <atomic>
#include "time.h"
#include "refcount.h"
#include <ntsecapi.h> // NTSTATUS
#include <ntstatus.h> // STATUS_SUCCESS
#undef ERROR // dammit windows.h
namespace kj {
Win32IocpEventPort::Win32IocpEventPort()
: clock(systemPreciseMonotonicClock()),
iocp(newIocpHandle()), thread(openCurrentThread()), timerImpl(clock.now()) {}
Win32IocpEventPort::~Win32IocpEventPort() noexcept(false) {}
class Win32IocpEventPort::IoPromiseAdapter final: public OVERLAPPED {
public:
IoPromiseAdapter(PromiseFulfiller<IoResult>& fulfiller, Win32IocpEventPort& port,
uint64_t offset, IoPromiseAdapter** selfPtr)
: fulfiller(fulfiller), port(port) {
*selfPtr = this;
memset(implicitCast<OVERLAPPED*>(this), 0, sizeof(OVERLAPPED));
this->Offset = offset & 0x00000000FFFFFFFFull;
this->OffsetHigh = offset >> 32;
}
~IoPromiseAdapter() {
if (handle != INVALID_HANDLE_VALUE) {
// Need to cancel the I/O.
//
// Note: Even if HasOverlappedIoCompleted(this) is true, CancelIoEx() still seems needed to
// force the completion event.
if (!CancelIoEx(handle, this)) {
DWORD error = GetLastError();
// ERROR_NOT_FOUND probably means the operation already completed and is enqueued on the
// IOCP.
//
// ERROR_INVALID_HANDLE probably means that, amid a mass of destructors, the HANDLE was
// closed before all of the I/O promises were destroyed. We tolerate this so long as the
// I/O promises are also destroyed before returning to the event loop, hence the I/O
// tasks won't actually continue on a dead handle.
//
// TODO(cleanup): ERROR_INVALID_HANDLE really shouldn't be allowed. Unfortunately, the
// refcounted nature of capabilities and the RPC system seems to mean that objects
// are unwound in the wrong order in several of Cap'n Proto's tests. So we live with this
// for now. Note that even if a new handle is opened with the same numeric value, it
// should be hardless to call CancelIoEx() on it because it couldn't possibly be using
// the same OVERLAPPED structure.
if (error != ERROR_NOT_FOUND && error != ERROR_INVALID_HANDLE) {
KJ_FAIL_WIN32("CancelIoEx()", error, handle);
}
}
// We have to wait for the IOCP to poop out the event, so that we can safely destroy the
// OVERLAPPED.
while (handle != INVALID_HANDLE_VALUE) {
port.waitIocp(INFINITE);
}
}
}
void start(HANDLE handle) {
KJ_ASSERT(this->handle == INVALID_HANDLE_VALUE);
this->handle = handle;
}
void done(IoResult result) {
KJ_ASSERT(handle != INVALID_HANDLE_VALUE);
handle = INVALID_HANDLE_VALUE;
fulfiller.fulfill(kj::mv(result));
}
private:
PromiseFulfiller<IoResult>& fulfiller;
Win32IocpEventPort& port;
HANDLE handle = INVALID_HANDLE_VALUE;
// If an I/O operation is currently enqueued, the handle on which it is enqueued.
};
class Win32IocpEventPort::IoOperationImpl final: public Win32EventPort::IoOperation {
public:
explicit IoOperationImpl(Win32IocpEventPort& port, HANDLE handle, uint64_t offset)
: handle(handle),
promise(newAdaptedPromise<IoResult, IoPromiseAdapter>(port, offset, &promiseAdapter)) {}
LPOVERLAPPED getOverlapped() override {
KJ_REQUIRE(promiseAdapter != nullptr, "already called onComplete()");
return promiseAdapter;
}
Promise<IoResult> onComplete() override {
KJ_REQUIRE(promiseAdapter != nullptr, "can only call onComplete() once");
promiseAdapter->start(handle);
promiseAdapter = nullptr;
return kj::mv(promise);
}
private:
HANDLE handle;
IoPromiseAdapter* promiseAdapter;
Promise<IoResult> promise;
};
class Win32IocpEventPort::IoObserverImpl final: public Win32EventPort::IoObserver {
public:
IoObserverImpl(Win32IocpEventPort& port, HANDLE handle)
: port(port), handle(handle) {
KJ_WIN32(CreateIoCompletionPort(handle, port.iocp, 0, 1), handle, port.iocp.get());
}
Own<IoOperation> newOperation(uint64_t offset) {
return heap<IoOperationImpl>(port, handle, offset);
}
private:
Win32IocpEventPort& port;
HANDLE handle;
};
Own<Win32EventPort::IoObserver> Win32IocpEventPort::observeIo(HANDLE handle) {
return heap<IoObserverImpl>(*this, handle);
}
Own<Win32EventPort::SignalObserver> Win32IocpEventPort::observeSignalState(HANDLE handle) {
return waitThreads.observeSignalState(handle);
}
bool Win32IocpEventPort::wait() {
waitIocp(timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, INFINITE - 1)
.map([](uint64_t t) -> DWORD { return t; })
.orDefault(INFINITE));
timerImpl.advanceTo(clock.now());
return receivedWake();
}
bool Win32IocpEventPort::poll() {
waitIocp(0);
return receivedWake();
}
void Win32IocpEventPort::wake() const {
if (!sentWake.load(std::memory_order_acquire)) {
sentWake.store(true, std::memory_order_release);
KJ_WIN32(PostQueuedCompletionStatus(iocp, 0, 0, nullptr));
}
}
void Win32IocpEventPort::waitIocp(DWORD timeoutMs) {
if (isAllowApc) {
ULONG countReceived = 0;
OVERLAPPED_ENTRY entry;
memset(&entry, 0, sizeof(entry));
if (GetQueuedCompletionStatusEx(iocp, &entry, 1, &countReceived, timeoutMs, TRUE)) {
KJ_ASSERT(countReceived == 1);
if (entry.lpOverlapped == nullptr) {
// wake() called in another thread, or APC queued.
} else {
DWORD error = ERROR_SUCCESS;
if (entry.lpOverlapped->Internal != STATUS_SUCCESS) {
error = LsaNtStatusToWinError(entry.lpOverlapped->Internal);
}
static_cast<IoPromiseAdapter*>(entry.lpOverlapped)
->done(IoResult { error, entry.dwNumberOfBytesTransferred });
}
} else {
// Call failed.
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT || error == WAIT_IO_COMPLETION) {
// WAIT_TIMEOUT = timed out (dunno why this isn't ERROR_TIMEOUT??)
// WAIT_IO_COMPLETION = APC queued
// Either way, nothing to do.
return;
} else {
KJ_FAIL_WIN32("GetQueuedCompletionStatusEx()", error, error, entry.lpOverlapped);
}
}
} else {
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED overlapped = nullptr;
BOOL success = GetQueuedCompletionStatus(
iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs);
if (overlapped == nullptr) {
if (success) {
// wake() called in another thread.
} else {
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT) {
// Great, nothing to do. (Why this is WAIT_TIMEOUT and not ERROR_TIMEOUT I'm not sure.)
} else {
KJ_FAIL_WIN32("GetQueuedCompletionStatus()", error, error, overlapped);
}
}
} else {
DWORD error = success ? ERROR_SUCCESS : GetLastError();
static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred });
}
}
}
bool Win32IocpEventPort::receivedWake() {
if (sentWake.load(std::memory_order_acquire)) {
sentWake.store(false, std::memory_order_release);
return true;
} else {
return false;
}
}
AutoCloseHandle Win32IocpEventPort::newIocpHandle() {
HANDLE h;
KJ_WIN32(h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1));
return AutoCloseHandle(h);
}
AutoCloseHandle Win32IocpEventPort::openCurrentThread() {
HANDLE process = GetCurrentProcess();
HANDLE result;
KJ_WIN32(DuplicateHandle(process, GetCurrentThread(), process, &result,
0, FALSE, DUPLICATE_SAME_ACCESS));
return AutoCloseHandle(result);
}
// =======================================================================================
Win32WaitObjectThreadPool::Win32WaitObjectThreadPool(uint mainThreadCount) {}
Own<Win32EventPort::SignalObserver> Win32WaitObjectThreadPool::observeSignalState(HANDLE handle) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
uint Win32WaitObjectThreadPool::prepareMainThreadWait(HANDLE* handles[]) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
bool Win32WaitObjectThreadPool::finishedMainThreadWait(DWORD returnCode) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
} // namespace kj
#endif // _WIN32