Implement RS VSync on new vsync infrastructure.

Change-Id: I662159a086a56e28732dd64a3a3cb30f8d4b72b1

Replace lockless fifo from server to client with sockets.

Change-Id: I99a4ab4f18496c0fbac96ee7b8099797af4712ea
diff --git a/rsThreadIO.cpp b/rsThreadIO.cpp
index 1917774..8e4b988 100644
--- a/rsThreadIO.cpp
+++ b/rsThreadIO.cpp
@@ -18,227 +18,189 @@
 
 #include "rsThreadIO.h"
 
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <fcntl.h>
+#include <poll.h>
+
+
 using namespace android;
 using namespace android::renderscript;
 
-ThreadIO::ThreadIO() : mUsingSocket(false) {
+ThreadIO::ThreadIO() {
+    mRunning = true;
 }
 
 ThreadIO::~ThreadIO() {
 }
 
-void ThreadIO::init(bool useSocket) {
-    mUsingSocket = useSocket;
-    mToCore.init(16 * 1024);
-
-    if (mUsingSocket) {
-        mToClientSocket.init();
-        mToCoreSocket.init();
-    } else {
-        mToClient.init(1024);
-    }
+void ThreadIO::init() {
+    mToClient.init();
+    mToCore.init();
 }
 
 void ThreadIO::shutdown() {
-    //ALOGE("shutdown 1");
+    mRunning = false;
     mToCore.shutdown();
-    //ALOGE("shutdown 2");
-}
-
-void ThreadIO::coreFlush() {
-    //ALOGE("coreFlush 1");
-    if (mUsingSocket) {
-    } else {
-        mToCore.flush();
-    }
-    //ALOGE("coreFlush 2");
 }
 
 void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
     //ALOGE("coreHeader %i %i", cmdID, dataLen);
-    if (mUsingSocket) {
-        CoreCmdHeader hdr;
-        hdr.bytes = dataLen;
-        hdr.cmdID = cmdID;
-        mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
-    } else {
-        mCoreCommandSize = dataLen;
-        mCoreCommandID = cmdID;
-        mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen);
-        mCoreDataBasePtr = mCoreDataPtr;
-    }
-    //ALOGE("coreHeader ret %p", mCoreDataPtr);
-    return mCoreDataPtr;
-}
-
-void ThreadIO::coreData(const void *data, size_t dataLen) {
-    //ALOGE("coreData %p %i", data, dataLen);
-    mToCoreSocket.writeAsync(data, dataLen);
-    //ALOGE("coreData ret %p", mCoreDataPtr);
+    CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
+    hdr->bytes = dataLen;
+    hdr->cmdID = cmdID;
+    mSendLen = dataLen + sizeof(CoreCmdHeader);
+    //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
+    //ALOGE("coreHeader ret ");
+    return &mSendBuffer[sizeof(CoreCmdHeader)];
 }
 
 void ThreadIO::coreCommit() {
-    //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
-    if (mUsingSocket) {
-    } else {
-        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
-        mToCore.commit(mCoreCommandID, mCoreCommandSize);
-    }
-    //ALOGE("coreCommit ret");
-}
-
-void ThreadIO::coreCommitSync() {
-    //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
-    if (mUsingSocket) {
-    } else {
-        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
-        mToCore.commitSync(mCoreCommandID, mCoreCommandSize);
-    }
-    //ALOGE("coreCommitSync ret");
+    mToCore.writeAsync(&mSendBuffer, mSendLen);
 }
 
 void ThreadIO::clientShutdown() {
-    //ALOGE("coreShutdown 1");
     mToClient.shutdown();
-    //ALOGE("coreShutdown 2");
 }
 
 void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
-    rsAssert(dataLen <= sizeof(mToCoreRet));
-    memcpy(&mToCoreRet, data, dataLen);
+    uint32_t buf;
+    if (data == NULL) {
+        data = &buf;
+        dataLen = sizeof(buf);
+    }
+
+    mToCore.readReturn(data, dataLen);
 }
 
 void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
-    memcpy(data, &mToCoreRet, dataLen);
+    uint32_t buf;
+    if (data == NULL) {
+        data = &buf;
+        dataLen = sizeof(buf);
+    }
+
+    mToCore.writeWaitReturn(data, dataLen);
 }
 
-void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
-    mToCore.setTimoutCallback(cb, dat, timeout);
+void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
+    //mToCore.setTimeoutCallback(cb, dat, timeout);
 }
 
-
-bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) {
+bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, int waitFd) {
     bool ret = false;
-    uint64_t startTime = con->getTime();
 
-    while (!mToCore.isEmpty() || waitForCommand) {
-        uint32_t cmdID = 0;
-        uint32_t cmdSize = 0;
-        if (con->props.mLogTimes) {
-            con->timerSet(Context::RS_TIMER_IDLE);
+    uint8_t buf[2 * 1024];
+    const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
+    const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
+
+    struct pollfd p[2];
+    p[0].fd = mToCore.getReadFd();
+    p[0].events = POLLIN;
+    p[0].revents = 0;
+    p[1].fd = waitFd;
+    p[1].events = POLLIN;
+    p[1].revents = 0;
+    int pollCount = 1;
+    if (waitFd >= 0) {
+        pollCount = 2;
+    }
+
+    if (con->props.mLogTimes) {
+        con->timerSet(Context::RS_TIMER_IDLE);
+    }
+
+    int waitTime = -1;
+    while (mRunning) {
+        int pr = poll(p, pollCount, waitTime);
+        if (pr <= 0) {
+            break;
         }
 
-        uint64_t delay = 0;
-        if (waitForCommand) {
-            delay = timeToWait - (con->getTime() - startTime);
-            if (delay > timeToWait) {
-                delay = 0;
+        if (p[0].revents) {
+            size_t r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
+            mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
+
+            if (r != sizeof(CoreCmdHeader)) {
+                // exception or timeout occurred.
+                break;
+            }
+
+            ret = true;
+            if (con->props.mLogTimes) {
+                con->timerSet(Context::RS_TIMER_INTERNAL);
+            }
+            waitForCommand = false;
+            //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
+
+            if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
+                rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
+                ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
+            }
+            gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
+
+            if (con->props.mLogTimes) {
+                con->timerSet(Context::RS_TIMER_IDLE);
+            }
+
+            if (waitFd < 0) {
+                // If we don't have a secondary wait object we should stop blocking now
+                // that at least one command has been processed.
+                waitTime = 0;
             }
         }
 
-        if (delay == 0 && timeToWait != 0 && mToCore.isEmpty()) {
+        if (p[1].revents && !p[0].revents) {
+            // We want to finish processing fifo events before processing the vsync.
+            // Otherwise we can end up falling behind and having tremendous lag.
             break;
         }
-
-        const void * data = mToCore.get(&cmdID, &cmdSize, delay);
-        if (!cmdSize) {
-            // exception or timeout occurred.
-            break;
-        }
-        ret = true;
-        if (con->props.mLogTimes) {
-            con->timerSet(Context::RS_TIMER_INTERNAL);
-        }
-        waitForCommand = false;
-        //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize);
-
-        if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
-            rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
-            ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID);
-            mToCore.printDebugData();
-        }
-        gPlaybackFuncs[cmdID](con, data, cmdSize << 2);
-        mToCore.next();
     }
     return ret;
 }
 
 RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
-    if (mUsingSocket) {
-        mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader));
-    } else {
-        size_t bytesData = 0;
-        const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData);
-        if (bytesData >= sizeof(uint32_t)) {
-            mLastClientHeader.userID = d[0];
-            mLastClientHeader.bytes = bytesData - sizeof(uint32_t);
-        } else {
-            mLastClientHeader.userID = 0;
-            mLastClientHeader.bytes = 0;
-        }
-    }
+    //ALOGE("getClientHeader");
+    mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
+
     receiveLen[0] = mLastClientHeader.bytes;
     usrID[0] = mLastClientHeader.userID;
+    //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
     return (RsMessageToClientType)mLastClientHeader.cmdID;
 }
 
 RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
                                 uint32_t *usrID, size_t bufferLen) {
+    //ALOGE("getClientPayload");
     receiveLen[0] = mLastClientHeader.bytes;
     usrID[0] = mLastClientHeader.userID;
     if (bufferLen < mLastClientHeader.bytes) {
         return RS_MESSAGE_TO_CLIENT_RESIZE;
     }
-    if (mUsingSocket) {
-        if (receiveLen[0]) {
-            mToClientSocket.read(data, receiveLen[0]);
-        }
-        return (RsMessageToClientType)mLastClientHeader.cmdID;
-    } else {
-        uint32_t bytesData = 0;
-        uint32_t commandID = 0;
-        const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData);
-        //ALOGE("getMessageToClient 3    %i  %i", commandID, bytesData);
-        //ALOGE("getMessageToClient  %i %i", commandID, *subID);
-        if (bufferLen >= receiveLen[0]) {
-            memcpy(data, d+1, receiveLen[0]);
-            mToClient.next();
-            return (RsMessageToClientType)commandID;
-        }
+    if (receiveLen[0]) {
+        mToClient.read(data, receiveLen[0]);
     }
-    return RS_MESSAGE_TO_CLIENT_RESIZE;
+    //ALOGE("getClientPayload x");
+    return (RsMessageToClientType)mLastClientHeader.cmdID;
 }
 
 bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
                             size_t dataLen, bool waitForSpace) {
+
+    //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
     ClientCmdHeader hdr;
     hdr.bytes = dataLen;
     hdr.cmdID = cmdID;
     hdr.userID = usrID;
-    if (mUsingSocket) {
-        mToClientSocket.writeAsync(&hdr, sizeof(hdr));
-        if (dataLen) {
-            mToClientSocket.writeAsync(data, dataLen);
-        }
-        return true;
-    } else {
-        if (!waitForSpace) {
-            if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) {
-                // Not enough room, and not waiting.
-                return false;
-            }
-        }
 
-        //ALOGE("sendMessageToClient 2");
-        uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID));
-        p[0] = usrID;
-        if (dataLen > 0) {
-            memcpy(p+1, data, dataLen);
-        }
-        mToClient.commit(cmdID, dataLen + sizeof(usrID));
-        //ALOGE("sendMessageToClient 3");
-        return true;
+    mToClient.writeAsync(&hdr, sizeof(hdr));
+    if (dataLen) {
+        mToClient.writeAsync(data, dataLen);
     }
-    return false;
+
+    //ALOGE("sendToClient x");
+    return true;
 }