Implement RS VSync on new vsync infrastructure.
Change-Id: I662159a086a56e28732dd64a3a3cb30f8d4b72b1
Replace lockless fifo from server to client with sockets.
Change-Id: I99a4ab4f18496c0fbac96ee7b8099797af4712ea
diff --git a/rsThreadIO.cpp b/rsThreadIO.cpp
index 1917774..8e4b988 100644
--- a/rsThreadIO.cpp
+++ b/rsThreadIO.cpp
@@ -18,227 +18,189 @@
#include "rsThreadIO.h"
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <fcntl.h>
+#include <poll.h>
+
+
using namespace android;
using namespace android::renderscript;
-ThreadIO::ThreadIO() : mUsingSocket(false) {
+ThreadIO::ThreadIO() {
+ mRunning = true;
}
ThreadIO::~ThreadIO() {
}
-void ThreadIO::init(bool useSocket) {
- mUsingSocket = useSocket;
- mToCore.init(16 * 1024);
-
- if (mUsingSocket) {
- mToClientSocket.init();
- mToCoreSocket.init();
- } else {
- mToClient.init(1024);
- }
+void ThreadIO::init() {
+ mToClient.init();
+ mToCore.init();
}
void ThreadIO::shutdown() {
- //ALOGE("shutdown 1");
+ mRunning = false;
mToCore.shutdown();
- //ALOGE("shutdown 2");
-}
-
-void ThreadIO::coreFlush() {
- //ALOGE("coreFlush 1");
- if (mUsingSocket) {
- } else {
- mToCore.flush();
- }
- //ALOGE("coreFlush 2");
}
void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
//ALOGE("coreHeader %i %i", cmdID, dataLen);
- if (mUsingSocket) {
- CoreCmdHeader hdr;
- hdr.bytes = dataLen;
- hdr.cmdID = cmdID;
- mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
- } else {
- mCoreCommandSize = dataLen;
- mCoreCommandID = cmdID;
- mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen);
- mCoreDataBasePtr = mCoreDataPtr;
- }
- //ALOGE("coreHeader ret %p", mCoreDataPtr);
- return mCoreDataPtr;
-}
-
-void ThreadIO::coreData(const void *data, size_t dataLen) {
- //ALOGE("coreData %p %i", data, dataLen);
- mToCoreSocket.writeAsync(data, dataLen);
- //ALOGE("coreData ret %p", mCoreDataPtr);
+ CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
+ hdr->bytes = dataLen;
+ hdr->cmdID = cmdID;
+ mSendLen = dataLen + sizeof(CoreCmdHeader);
+ //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
+ //ALOGE("coreHeader ret ");
+ return &mSendBuffer[sizeof(CoreCmdHeader)];
}
void ThreadIO::coreCommit() {
- //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
- if (mUsingSocket) {
- } else {
- rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
- mToCore.commit(mCoreCommandID, mCoreCommandSize);
- }
- //ALOGE("coreCommit ret");
-}
-
-void ThreadIO::coreCommitSync() {
- //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
- if (mUsingSocket) {
- } else {
- rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
- mToCore.commitSync(mCoreCommandID, mCoreCommandSize);
- }
- //ALOGE("coreCommitSync ret");
+ mToCore.writeAsync(&mSendBuffer, mSendLen);
}
void ThreadIO::clientShutdown() {
- //ALOGE("coreShutdown 1");
mToClient.shutdown();
- //ALOGE("coreShutdown 2");
}
void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
- rsAssert(dataLen <= sizeof(mToCoreRet));
- memcpy(&mToCoreRet, data, dataLen);
+ uint32_t buf;
+ if (data == NULL) {
+ data = &buf;
+ dataLen = sizeof(buf);
+ }
+
+ mToCore.readReturn(data, dataLen);
}
void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
- memcpy(data, &mToCoreRet, dataLen);
+ uint32_t buf;
+ if (data == NULL) {
+ data = &buf;
+ dataLen = sizeof(buf);
+ }
+
+ mToCore.writeWaitReturn(data, dataLen);
}
-void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
- mToCore.setTimoutCallback(cb, dat, timeout);
+void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
+ //mToCore.setTimeoutCallback(cb, dat, timeout);
}
-
-bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) {
+bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, int waitFd) {
bool ret = false;
- uint64_t startTime = con->getTime();
- while (!mToCore.isEmpty() || waitForCommand) {
- uint32_t cmdID = 0;
- uint32_t cmdSize = 0;
- if (con->props.mLogTimes) {
- con->timerSet(Context::RS_TIMER_IDLE);
+ uint8_t buf[2 * 1024];
+ const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
+ const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
+
+ struct pollfd p[2];
+ p[0].fd = mToCore.getReadFd();
+ p[0].events = POLLIN;
+ p[0].revents = 0;
+ p[1].fd = waitFd;
+ p[1].events = POLLIN;
+ p[1].revents = 0;
+ int pollCount = 1;
+ if (waitFd >= 0) {
+ pollCount = 2;
+ }
+
+ if (con->props.mLogTimes) {
+ con->timerSet(Context::RS_TIMER_IDLE);
+ }
+
+ int waitTime = -1;
+ while (mRunning) {
+ int pr = poll(p, pollCount, waitTime);
+ if (pr <= 0) {
+ break;
}
- uint64_t delay = 0;
- if (waitForCommand) {
- delay = timeToWait - (con->getTime() - startTime);
- if (delay > timeToWait) {
- delay = 0;
+ if (p[0].revents) {
+ size_t r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
+ mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
+
+ if (r != sizeof(CoreCmdHeader)) {
+ // exception or timeout occurred.
+ break;
+ }
+
+ ret = true;
+ if (con->props.mLogTimes) {
+ con->timerSet(Context::RS_TIMER_INTERNAL);
+ }
+ waitForCommand = false;
+ //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
+
+ if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
+ rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
+ ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
+ }
+ gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
+
+ if (con->props.mLogTimes) {
+ con->timerSet(Context::RS_TIMER_IDLE);
+ }
+
+ if (waitFd < 0) {
+ // If we don't have a secondary wait object we should stop blocking now
+ // that at least one command has been processed.
+ waitTime = 0;
}
}
- if (delay == 0 && timeToWait != 0 && mToCore.isEmpty()) {
+ if (p[1].revents && !p[0].revents) {
+ // We want to finish processing fifo events before processing the vsync.
+ // Otherwise we can end up falling behind and having tremendous lag.
break;
}
-
- const void * data = mToCore.get(&cmdID, &cmdSize, delay);
- if (!cmdSize) {
- // exception or timeout occurred.
- break;
- }
- ret = true;
- if (con->props.mLogTimes) {
- con->timerSet(Context::RS_TIMER_INTERNAL);
- }
- waitForCommand = false;
- //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize);
-
- if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
- rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
- ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID);
- mToCore.printDebugData();
- }
- gPlaybackFuncs[cmdID](con, data, cmdSize << 2);
- mToCore.next();
}
return ret;
}
RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
- if (mUsingSocket) {
- mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader));
- } else {
- size_t bytesData = 0;
- const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData);
- if (bytesData >= sizeof(uint32_t)) {
- mLastClientHeader.userID = d[0];
- mLastClientHeader.bytes = bytesData - sizeof(uint32_t);
- } else {
- mLastClientHeader.userID = 0;
- mLastClientHeader.bytes = 0;
- }
- }
+ //ALOGE("getClientHeader");
+ mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
+
receiveLen[0] = mLastClientHeader.bytes;
usrID[0] = mLastClientHeader.userID;
+ //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
return (RsMessageToClientType)mLastClientHeader.cmdID;
}
RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
uint32_t *usrID, size_t bufferLen) {
+ //ALOGE("getClientPayload");
receiveLen[0] = mLastClientHeader.bytes;
usrID[0] = mLastClientHeader.userID;
if (bufferLen < mLastClientHeader.bytes) {
return RS_MESSAGE_TO_CLIENT_RESIZE;
}
- if (mUsingSocket) {
- if (receiveLen[0]) {
- mToClientSocket.read(data, receiveLen[0]);
- }
- return (RsMessageToClientType)mLastClientHeader.cmdID;
- } else {
- uint32_t bytesData = 0;
- uint32_t commandID = 0;
- const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData);
- //ALOGE("getMessageToClient 3 %i %i", commandID, bytesData);
- //ALOGE("getMessageToClient %i %i", commandID, *subID);
- if (bufferLen >= receiveLen[0]) {
- memcpy(data, d+1, receiveLen[0]);
- mToClient.next();
- return (RsMessageToClientType)commandID;
- }
+ if (receiveLen[0]) {
+ mToClient.read(data, receiveLen[0]);
}
- return RS_MESSAGE_TO_CLIENT_RESIZE;
+ //ALOGE("getClientPayload x");
+ return (RsMessageToClientType)mLastClientHeader.cmdID;
}
bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
size_t dataLen, bool waitForSpace) {
+
+ //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
ClientCmdHeader hdr;
hdr.bytes = dataLen;
hdr.cmdID = cmdID;
hdr.userID = usrID;
- if (mUsingSocket) {
- mToClientSocket.writeAsync(&hdr, sizeof(hdr));
- if (dataLen) {
- mToClientSocket.writeAsync(data, dataLen);
- }
- return true;
- } else {
- if (!waitForSpace) {
- if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) {
- // Not enough room, and not waiting.
- return false;
- }
- }
- //ALOGE("sendMessageToClient 2");
- uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID));
- p[0] = usrID;
- if (dataLen > 0) {
- memcpy(p+1, data, dataLen);
- }
- mToClient.commit(cmdID, dataLen + sizeof(usrID));
- //ALOGE("sendMessageToClient 3");
- return true;
+ mToClient.writeAsync(&hdr, sizeof(hdr));
+ if (dataLen) {
+ mToClient.writeAsync(data, dataLen);
}
- return false;
+
+ //ALOGE("sendToClient x");
+ return true;
}