Threading RS ForEach.

Change-Id: I5d6fe4db2b6ac0613394bc5a066ff90ec146d60e
diff --git a/rsContext.cpp b/rsContext.cpp
index 68eca44..629b481 100644
--- a/rsContext.cpp
+++ b/rsContext.cpp
@@ -23,6 +23,7 @@
 
 #include <sys/types.h>
 #include <sys/resource.h>
+#include <sched.h>
 
 #include <cutils/properties.h>
 
@@ -355,6 +356,49 @@
      return NULL;
 }
 
+void * Context::helperThreadProc(void *vrsc)
+{
+     Context *rsc = static_cast<Context *>(vrsc);
+     uint32_t idx = (uint32_t)android_atomic_inc(&rsc->mWorkers.mLaunchCount);
+
+     LOGE("helperThreadProc 1 %p idx=%i", rsc, idx);
+
+     rsc->mWorkers.mLaunchSignals[idx].init();
+     rsc->mWorkers.mNativeThreadId[idx] = gettid();
+
+     //cpu_set_t cpset[16];
+     //int ret = sched_getaffinity(rsc->mWorkers.mNativeThreadId[idx], sizeof(cpset), &cpset);
+     //LOGE("ret = %i", ret);
+
+//sched_setaffinity
+
+     setpriority(PRIO_PROCESS, rsc->mWorkers.mNativeThreadId[idx], rsc->mThreadPriority);
+     while(rsc->mRunning) {
+         rsc->mWorkers.mLaunchSignals[idx].wait();
+         if (rsc->mWorkers.mLaunchCallback) {
+    LOGE("helperThreadProc 4");
+            rsc->mWorkers.mLaunchCallback(rsc->mWorkers.mLaunchData, idx);
+         }
+    LOGE("helperThreadProc 5");
+         android_atomic_dec(&rsc->mWorkers.mRunningCount);
+         rsc->mWorkers.mCompleteSignal.set();
+     }
+     return NULL;
+}
+
+void Context::launchThreads(WorkerCallback_t cbk, void *data)
+{
+    mWorkers.mLaunchData = data;
+    mWorkers.mLaunchCallback = cbk;
+    mWorkers.mRunningCount = (int)mWorkers.mCount;
+    for (uint32_t ct = 0; ct < mWorkers.mCount; ct++) {
+        mWorkers.mLaunchSignals[ct].set();
+    }
+    while(mWorkers.mRunningCount) {
+        mWorkers.mCompleteSignal.wait();
+    }
+}
+
 void Context::setPriority(int32_t p)
 {
     // Note: If we put this in the proper "background" policy
@@ -371,7 +415,10 @@
         // success; reset the priority as well
     }
 #else
-        setpriority(PRIO_PROCESS, mNativeThreadId, p);
+    setpriority(PRIO_PROCESS, mNativeThreadId, p);
+    for (uint32_t ct=0; ct < mWorkers.mCount; ct++) {
+        setpriority(PRIO_PROCESS, mWorkers.mNativeThreadId[ct], p);
+    }
 #endif
 }
 
@@ -421,10 +468,26 @@
     timerInit();
     timerSet(RS_TIMER_INTERNAL);
 
-    LOGV("RS Launching thread");
+    LOGV("RS Launching thread(s)");
+    mWorkers.mCount = 2;
+    mWorkers.mThreadId = (pthread_t *) calloc(mWorkers.mCount, sizeof(pthread_t));
+    mWorkers.mNativeThreadId = (pid_t *) calloc(mWorkers.mCount, sizeof(pid_t));
+    mWorkers.mLaunchSignals = new Signal[mWorkers.mCount];
+    mWorkers.mLaunchCallback = NULL;
     status = pthread_create(&mThreadId, &threadAttr, threadProc, this);
     if (status) {
         LOGE("Failed to start rs context thread.");
+        return;
+    }
+    mWorkers.mRunningCount = 0;
+    mWorkers.mLaunchCount = 0;
+    for (uint32_t ct=0; ct < mWorkers.mCount; ct++) {
+        status = pthread_create(&mWorkers.mThreadId[ct], &threadAttr, helperThreadProc, this);
+        if (status) {
+            mWorkers.mCount = ct;
+            LOGE("Created fewer than expected number of RS threads.");
+            break;
+        }
     }
 
     while(!mRunning) {