Core to client fifo on sockets.

Change-Id: I3b84a7d4c3c5fa0d764ad4db22dfd142d5cfa95b
diff --git a/rsContext.cpp b/rsContext.cpp
index 98adabc..6027946 100644
--- a/rsContext.cpp
+++ b/rsContext.cpp
@@ -344,6 +344,8 @@
 bool Context::initContext(Device *dev, const RsSurfaceConfig *sc) {
     pthread_mutex_lock(&gInitMutex);
 
+    mIO.init();
+
     dev->addContext(this);
     mDev = dev;
     if (sc) {
@@ -393,7 +395,7 @@
 Context::~Context() {
     LOGV("Context::~Context");
 
-    mIO.mToCore.flush();
+    mIO.coreFlush();
     rsAssert(mExit);
     mExit = true;
     mPaused = false;
@@ -505,69 +507,18 @@
     }
 }
 
-RsMessageToClientType Context::peekMessageToClient(size_t *receiveLen, uint32_t *subID, bool wait) {
-    *receiveLen = 0;
-    if (!wait && mIO.mToClient.isEmpty()) {
-        return RS_MESSAGE_TO_CLIENT_NONE;
-    }
-
-    uint32_t bytesData = 0;
-    uint32_t commandID = 0;
-    const uint32_t *d = (const uint32_t *)mIO.mToClient.get(&commandID, &bytesData);
-    *receiveLen = bytesData - sizeof(uint32_t);
-    if (bytesData) {
-        *subID = d[0];
-    }
-    return (RsMessageToClientType)commandID;
+RsMessageToClientType Context::peekMessageToClient(size_t *receiveLen, uint32_t *subID) {
+    return (RsMessageToClientType)mIO.getClientHeader(receiveLen, subID);
 }
 
-RsMessageToClientType Context::getMessageToClient(void *data, size_t *receiveLen, uint32_t *subID, size_t bufferLen, bool wait) {
-    //LOGE("getMessageToClient %i %i", bufferLen, wait);
-    *receiveLen = 0;
-    if (!wait && mIO.mToClient.isEmpty()) {
-        return RS_MESSAGE_TO_CLIENT_NONE;
-    }
-
-    //LOGE("getMessageToClient 2 con=%p", this);
-    uint32_t bytesData = 0;
-    uint32_t commandID = 0;
-    const uint32_t *d = (const uint32_t *)mIO.mToClient.get(&commandID, &bytesData);
-    //LOGE("getMessageToClient 3    %i  %i", commandID, bytesData);
-
-    *receiveLen = bytesData - sizeof(uint32_t);
-    *subID = d[0];
-
-    //LOGE("getMessageToClient  %i %i", commandID, *subID);
-    if (bufferLen >= (*receiveLen)) {
-        memcpy(data, d+1, *receiveLen);
-        mIO.mToClient.next();
-        return (RsMessageToClientType)commandID;
-    }
-    return RS_MESSAGE_TO_CLIENT_RESIZE;
+RsMessageToClientType Context::getMessageToClient(void *data, size_t *receiveLen, uint32_t *subID, size_t bufferLen) {
+    return (RsMessageToClientType)mIO.getClientPayload(data, receiveLen, subID, bufferLen);
 }
 
 bool Context::sendMessageToClient(const void *data, RsMessageToClientType cmdID,
                                   uint32_t subID, size_t len, bool waitForSpace) const {
-    //LOGE("sendMessageToClient %i %i %i %i", cmdID, subID, len, waitForSpace);
-    if (cmdID == 0) {
-        LOGE("Attempting to send invalid command 0 to client.");
-        return false;
-    }
-    if (!waitForSpace) {
-        if (!mIO.mToClient.makeSpaceNonBlocking(len + 12)) {
-            // Not enough room, and not waiting.
-            return false;
-        }
-    }
-    //LOGE("sendMessageToClient 2");
-    uint32_t *p = (uint32_t *)mIO.mToClient.reserve(len + sizeof(subID));
-    p[0] = subID;
-    if (len > 0) {
-        memcpy(p+1, data, len);
-    }
-    mIO.mToClient.commit(cmdID, len + sizeof(subID));
-    //LOGE("sendMessageToClient 3");
-    return true;
+
+    return mIO.sendToClient(cmdID, subID, data, len, waitForSpace);
 }
 
 void Context::initToClient() {
@@ -577,7 +528,7 @@
 }
 
 void Context::deinitToClient() {
-    mIO.mToClient.shutdown();
+    mIO.clientShutdown();
 }
 
 void Context::setError(RsError e, const char *msg) const {
@@ -706,16 +657,16 @@
 
 RsMessageToClientType rsi_ContextPeekMessage(Context *rsc,
                                            size_t * receiveLen, size_t receiveLen_length,
-                                           uint32_t * subID, size_t subID_length, bool wait) {
-    return rsc->peekMessageToClient(receiveLen, subID, wait);
+                                           uint32_t * subID, size_t subID_length) {
+    return rsc->peekMessageToClient(receiveLen, subID);
 }
 
 RsMessageToClientType rsi_ContextGetMessage(Context *rsc, void * data, size_t data_length,
                                           size_t * receiveLen, size_t receiveLen_length,
-                                          uint32_t * subID, size_t subID_length, bool wait) {
+                                          uint32_t * subID, size_t subID_length) {
     rsAssert(subID_length == sizeof(uint32_t));
     rsAssert(receiveLen_length == sizeof(size_t));
-    return rsc->getMessageToClient(data, receiveLen, subID, data_length, wait);
+    return rsc->getMessageToClient(data, receiveLen, subID, data_length);
 }
 
 void rsi_ContextInitToClient(Context *rsc) {