oboe: stop stream if callback requests it

Stop in a separate thread for AAudio before Q.
Call stop directly for OpenSL ES.

Possible fix for #277
diff --git a/include/oboe/AudioStream.h b/include/oboe/AudioStream.h
index 5898b9f..01748d1 100644
--- a/include/oboe/AudioStream.h
+++ b/include/oboe/AudioStream.h
@@ -371,6 +371,11 @@
         return nullptr;
     }
 
+    /**
+     * Launch a thread that will stop the stream.
+     */
+    void launchStopThread();
+
 protected:
 
     /**
@@ -402,7 +407,7 @@
      * @return the result of the callback: stop or continue
      *
      */
-    DataCallbackResult fireCallback(void *audioData, int numFrames);
+    DataCallbackResult fireDataCallback(void *audioData, int numFrames);
 
     /**
      * Update mFramesWritten.
@@ -415,7 +420,22 @@
     virtual void updateFramesRead() = 0;
 
     /**
-     * Number of frames which have been written into the stream.
+     * @return true if callbacks may be called
+     */
+    bool isDataCallbackEnabled() {
+        return mDataCallbackEnabled;
+    }
+
+    /**
+     * This can be set false internally to prevent callbacks
+     * after DataCallbackResult::Stop has been returned.
+     */
+    void setDataCallbackEnabled(bool enabled) {
+        mDataCallbackEnabled = enabled;
+    }
+
+    /**
+     * Number of frames which have been written into the stream
      *
      * This is signed integer to match the counters in AAudio.
      * At audio rates, the counter will overflow in about six million years.
@@ -430,9 +450,13 @@
      */
     std::atomic<int64_t> mFramesRead{};
 
+    std::mutex           mLock; // for synchronizing start/stop/close
+
 private:
     int                  mPreviousScheduler = -1;
 
+    std::atomic<bool>    mDataCallbackEnabled{};
+
 };
 
 } // namespace oboe
diff --git a/src/aaudio/AAudioLoader.cpp b/src/aaudio/AAudioLoader.cpp
index 9a863c2..326e2e4 100644
--- a/src/aaudio/AAudioLoader.cpp
+++ b/src/aaudio/AAudioLoader.cpp
@@ -108,13 +108,15 @@
                                                  int64_t timeoutNanoseconds)>
             (dlsym(mLibHandle, "AAudioStream_waitForStateChange"));
 
-
     stream_getTimestamp = reinterpret_cast<aaudio_result_t (*)(AAudioStream *stream,
-                                           clockid_t clockid,
-                                           int64_t *framePosition,
-                                           int64_t *timeNanoseconds)>
+                                                               clockid_t clockid,
+                                                               int64_t *framePosition,
+                                                               int64_t *timeNanoseconds)>
             (dlsym(mLibHandle, "AAudioStream_getTimestamp"));
 
+    stream_isMMapUsed = reinterpret_cast<bool (*)(AAudioStream *stream)>
+            (dlsym(mLibHandle, "AAudioStream_isMMapUsed"));
+
     stream_getChannelCount    = load_I_PS("AAudioStream_getChannelCount");
     if (stream_getChannelCount == nullptr) {
         // Use old alias if needed.
diff --git a/src/aaudio/AAudioLoader.h b/src/aaudio/AAudioLoader.h
index f39926e..9f8cee3 100644
--- a/src/aaudio/AAudioLoader.h
+++ b/src/aaudio/AAudioLoader.h
@@ -113,6 +113,9 @@
                                           int64_t *framePosition,
                                           int64_t *timeNanoseconds);
 
+
+    bool            (*stream_isMMapUsed)(AAudioStream *stream) = nullptr;
+
     signature_I_PS   stream_close = nullptr;
 
     signature_I_PS   stream_getChannelCount = nullptr;
diff --git a/src/aaudio/AudioStreamAAudio.cpp b/src/aaudio/AudioStreamAAudio.cpp
index e2c3bf8..018e0bf 100644
--- a/src/aaudio/AudioStreamAAudio.cpp
+++ b/src/aaudio/AudioStreamAAudio.cpp
@@ -38,9 +38,10 @@
         int32_t numFrames) {
 
     AudioStreamAAudio *oboeStream = reinterpret_cast<AudioStreamAAudio*>(userData);
-    if (oboeStream != NULL) {
+    if (oboeStream != nullptr) {
         return static_cast<aaudio_data_callback_result_t>(
                 oboeStream->callOnAudioReady(stream, audioData, numFrames));
+
     } else {
         return static_cast<aaudio_data_callback_result_t>(DataCallbackResult::Stop);
     }
@@ -49,7 +50,7 @@
 static void oboe_aaudio_error_thread_proc(AudioStreamAAudio *oboeStream,
                                           AAudioStream *stream,
                                           Result error) {
-    if (oboeStream != NULL) {
+    if (oboeStream != nullptr) {
         oboeStream->onErrorInThread(stream, error);
     }
 }
@@ -231,10 +232,28 @@
 DataCallbackResult AudioStreamAAudio::callOnAudioReady(AAudioStream *stream,
                                                                  void *audioData,
                                                                  int32_t numFrames) {
-    return mStreamCallback->onAudioReady(
-            this,
-            audioData,
-            numFrames);
+    DataCallbackResult result = fireDataCallback(audioData, numFrames);
+    if (result == DataCallbackResult::Continue) {
+        return result;
+    } else {
+        if (result == DataCallbackResult::Stop) {
+            LOGE("Oboe callback returned DataCallbackResult::Stop");
+        } else {
+            LOGE("Oboe callback returned unexpected value = %d", result);
+        }
+
+        if (getSdkVersion() <= __ANDROID_API_P__) {
+            launchStopThread();
+            if (isMMapUsed()) {
+                return DataCallbackResult::Stop;
+            } else {
+                // Legacy stream <= API_P cannot be restarted after returning Stop.
+                return DataCallbackResult::Continue;
+            }
+        } else {
+            return DataCallbackResult::Stop; // OK >= API_Q
+        }
+    }
 }
 
 void AudioStreamAAudio::onErrorInThread(AAudioStream *stream, Result error) {
@@ -263,6 +282,7 @@
                 return Result::OK;
             }
         }
+        setDataCallbackEnabled(true);
         return static_cast<Result>(mLibLoader->stream_requestStart(stream));
     } else {
         return Result::ErrorClosed;
@@ -508,4 +528,13 @@
     return ResultWithValue<double>(latencyMillis);
 }
 
+bool AudioStreamAAudio::isMMapUsed() {
+    AAudioStream *stream = mAAudioStream.load();
+    if (stream != nullptr) {
+        return mLibLoader->stream_isMMapUsed(stream);
+    } else {
+        return false;
+    }
+}
+
 } // namespace oboe
diff --git a/src/aaudio/AudioStreamAAudio.h b/src/aaudio/AudioStreamAAudio.h
index ccabe05..8316d62 100644
--- a/src/aaudio/AudioStreamAAudio.h
+++ b/src/aaudio/AudioStreamAAudio.h
@@ -109,9 +109,10 @@
 
 private:
 
+    bool                 isMMapUsed();
+
     std::atomic<bool>    mCallbackThreadEnabled;
 
-    std::mutex           mLock; // for synchronizing start/stop/close
     std::atomic<AAudioStream *> mAAudioStream{nullptr};
 
     static AAudioLoader *mLibLoader;
diff --git a/src/common/AudioStream.cpp b/src/common/AudioStream.cpp
index 966bc36..5dfafb1 100644
--- a/src/common/AudioStream.cpp
+++ b/src/common/AudioStream.cpp
@@ -16,6 +16,7 @@
 
 #include <sys/types.h>
 #include <pthread.h>
+#include <thread>
 #include <oboe/AudioStream.h>
 #include "OboeDebug.h"
 #include <oboe/Utilities.h>
@@ -41,10 +42,15 @@
     return Result::OK;
 }
 
-DataCallbackResult AudioStream::fireCallback(void *audioData, int32_t numFrames) {
+DataCallbackResult AudioStream::fireDataCallback(void *audioData, int32_t numFrames) {
+    if (!isDataCallbackEnabled()) {
+        LOGW("AudioStream::%s() called with data callback disabled!", __func__);
+        return DataCallbackResult::Stop; // We should not be getting called any more.
+    }
+
     int scheduler = sched_getscheduler(0) & ~SCHED_RESET_ON_FORK; // for current thread
     if (scheduler != mPreviousScheduler) {
-        LOGD("AudioStream::fireCallback() scheduler = %s",
+        LOGD("AudioStream::%s() scheduler = %s", __func__,
              ((scheduler == SCHED_FIFO) ? "SCHED_FIFO" :
              ((scheduler == SCHED_OTHER) ? "SCHED_OTHER" :
              ((scheduler == SCHED_RR) ? "SCHED_RR" : "UNKNOWN")))
@@ -58,6 +64,9 @@
     } else {
         result = mStreamCallback->onAudioReady(this, audioData, numFrames);
     }
+    // On Oreo, we might get called after returning stop.
+    // So block there here.
+    setDataCallbackEnabled(result == DataCallbackResult::Continue);
 
     return result;
 }
@@ -131,4 +140,19 @@
     return mFramesWritten;
 }
 
+static void oboe_stop_thread_proc(AudioStream *oboeStream) {
+    LOGD("%s() called ----)))))", __func__);
+    if (oboeStream != nullptr) {
+        oboeStream->requestStop();
+    }
+    LOGD("%s() returning (((((----", __func__);
+}
+
+void AudioStream::launchStopThread() {
+    // Stop this stream on a separate thread
+    // std::thread t(requestStop);
+    std::thread t(oboe_stop_thread_proc, this);
+    t.detach();
+}
+
 } // namespace oboe
diff --git a/src/opensles/AudioInputStreamOpenSLES.cpp b/src/opensles/AudioInputStreamOpenSLES.cpp
index 1606da3..841e28b 100644
--- a/src/opensles/AudioInputStreamOpenSLES.cpp
+++ b/src/opensles/AudioInputStreamOpenSLES.cpp
@@ -206,18 +206,24 @@
 }
 
 Result AudioInputStreamOpenSLES::close() {
-
+    LOGD("AudioInputStreamOpenSLES::%s()", __func__);
+    mLock.lock();
+    Result result = Result::OK;
     if (mState == StreamState::Closed){
-        return Result::ErrorClosed;
+        result = Result::ErrorClosed;
     } else {
+        mLock.unlock(); // avoid recursive lock
         requestStop();
+        mLock.lock();
+        // invalidate any interfaces
         mRecordInterface = NULL;
-        return AudioStreamOpenSLES::close();
+        result = AudioStreamOpenSLES::close();
     }
+    mLock.unlock(); // avoid recursive lock
+    return result;
 }
 
 Result AudioInputStreamOpenSLES::setRecordState(SLuint32 newState) {
-
     LOGD("AudioInputStreamOpenSLES::setRecordState(%d)", newState);
     Result result = Result::OK;
 
@@ -234,41 +240,64 @@
 }
 
 Result AudioInputStreamOpenSLES::requestStart() {
-
-    LOGD("AudioInputStreamOpenSLES::requestStart()");
+    LOGD("AudioInputStreamOpenSLES(): %s() called", __func__);
+    mLock.lock();
     StreamState initialState = getState();
-    if (initialState == StreamState::Closed) return Result::ErrorClosed;
+    switch (initialState) {
+        case StreamState::Starting:
+        case StreamState::Started:
+            mLock.unlock();
+            return Result::OK;
+        case StreamState::Closed:
+            mLock.unlock();
+            return Result::ErrorClosed;
+        default:
+            break;
+    }
 
+    setDataCallbackEnabled(true);
     setState(StreamState::Starting);
     Result result = setRecordState(SL_RECORDSTATE_RECORDING);
     if (result == Result::OK) {
         // Enqueue the first buffer so that we have data ready in the callback.
         setState(StreamState::Started);
+        mLock.unlock();
         enqueueCallbackBuffer(mSimpleBufferQueueInterface);
+        mLock.lock();
     } else {
         setState(initialState);
     }
+    mLock.unlock();
     return result;
 }
 
 
 Result AudioInputStreamOpenSLES::requestPause() {
-    LOGW("AudioInputStreamOpenSLES::requestPause() is intentionally not implemented for input "
-         "streams");
+    LOGW("AudioInputStreamOpenSLES::%s() is intentionally not implemented for input "
+         "streams", __func__);
     return Result::ErrorUnimplemented; // Matches AAudio behavior.
 }
 
 Result AudioInputStreamOpenSLES::requestFlush() {
-    LOGW("AudioInputStreamOpenSLES::requestFlush() is intentionally not implemented for input "
-         "streams");
+    LOGW("AudioInputStreamOpenSLES::%s() is intentionally not implemented for input "
+         "streams", __func__);
     return Result::ErrorUnimplemented; // Matches AAudio behavior.
 }
 
 Result AudioInputStreamOpenSLES::requestStop() {
+    LOGD("AudioInputStreamOpenSLES(): %s() called", __func__);
 
-    LOGD("AudioInputStreamOpenSLES::requestStop()");
+    std::lock_guard<std::mutex> lock(mLock);
     StreamState initialState = getState();
-    if (initialState == StreamState::Closed) return Result::ErrorClosed;
+    switch (initialState) {
+        case StreamState::Stopping:
+        case StreamState::Stopped:
+            return Result::OK;
+        case StreamState::Closed:
+            return Result::ErrorClosed;
+        default:
+            break;
+    }
 
     setState(StreamState::Stopping);
 
diff --git a/src/opensles/AudioOutputStreamOpenSLES.cpp b/src/opensles/AudioOutputStreamOpenSLES.cpp
index adcf2d4..ed65824 100644
--- a/src/opensles/AudioOutputStreamOpenSLES.cpp
+++ b/src/opensles/AudioOutputStreamOpenSLES.cpp
@@ -234,20 +234,24 @@
 
 Result AudioOutputStreamOpenSLES::onAfterDestroy() {
     OutputMixerOpenSL::getInstance().close();
-
     return Result::OK;
 }
 
 Result AudioOutputStreamOpenSLES::close() {
-
+    mLock.lock();
+    Result result = Result::OK;
     if (mState == StreamState::Closed){
-        return Result::ErrorClosed;
+        result = Result::ErrorClosed;
     } else {
+        mLock.unlock(); // avoid recursive lock
         requestPause();
+        mLock.lock();
         // invalidate any interfaces
         mPlayInterface = NULL;
-        return AudioStreamOpenSLES::close();
+        result = AudioStreamOpenSLES::close();
     }
+    mLock.unlock(); // avoid recursive lock
+    return result;
 }
 
 Result AudioOutputStreamOpenSLES::setPlayState(SLuint32 newState) {
@@ -269,27 +273,53 @@
 }
 
 Result AudioOutputStreamOpenSLES::requestStart() {
+    LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
 
-    LOGD("AudioOutputStreamOpenSLES(): requestStart()");
+    mLock.lock();
     StreamState initialState = getState();
-    if (initialState == StreamState::Closed) return Result::ErrorClosed;
+    switch (initialState) {
+        case StreamState::Starting:
+        case StreamState::Started:
+            mLock.unlock();
+            return Result::OK;
+        case StreamState::Closed:
+            mLock.unlock();
+            return Result::ErrorClosed;
+        default:
+            break;
+    }
 
+    setDataCallbackEnabled(true);
     setState(StreamState::Starting);
     Result result = setPlayState(SL_PLAYSTATE_PLAYING);
     if (result == Result::OK) {
         setState(StreamState::Started);
+        mLock.unlock();
+        // This could call requestStop() so try to avoid a recursive lock.
         processBufferCallback(mSimpleBufferQueueInterface);
+        mLock.lock();
     } else {
         setState(initialState);
     }
+    LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
+    mLock.unlock();
     return result;
 }
 
 Result AudioOutputStreamOpenSLES::requestPause() {
+    LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
 
-    LOGD("AudioOutputStreamOpenSLES::requestPause()");
+    std::lock_guard<std::mutex> lock(mLock);
     StreamState initialState = getState();
-    if (initialState == StreamState::Closed) return Result::ErrorClosed;
+    switch (initialState) {
+        case StreamState::Pausing:
+        case StreamState::Paused:
+            return Result::OK;
+        case StreamState::Closed:
+            return Result::ErrorClosed;
+        default:
+            break;
+    }
 
     setState(StreamState::Pausing);
     Result result = setPlayState(SL_PLAYSTATE_PAUSED);
@@ -303,6 +333,7 @@
     } else {
         setState(initialState);
     }
+    LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
     return result;
 }
 
@@ -310,28 +341,36 @@
  * Flush/clear the queue buffers
  */
 Result AudioOutputStreamOpenSLES::requestFlush() {
-
-    LOGD("AudioOutputStreamOpenSLES(): requestFlush()");
+    LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
     if (getState() == StreamState::Closed) return Result::ErrorClosed;
-
+    Result result = Result::OK;
     if (mPlayInterface == NULL || mSimpleBufferQueueInterface == NULL) {
-        return Result::ErrorInvalidState;
+        result = Result::ErrorInvalidState;
     } else {
-        SLresult result = (*mSimpleBufferQueueInterface)->Clear(mSimpleBufferQueueInterface);
-        if (result == SL_RESULT_SUCCESS){
-            return Result::OK;
-        } else {
+        SLresult slResult = (*mSimpleBufferQueueInterface)->Clear(mSimpleBufferQueueInterface);
+        if (slResult != SL_RESULT_SUCCESS){
             LOGW("Failed to clear buffer queue. OpenSLES error: %d", result);
-            return Result::ErrorInternal;
+            result = Result::ErrorInternal;
         }
     }
+    LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
+    return result;
 }
 
 Result AudioOutputStreamOpenSLES::requestStop() {
+    LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
 
-    LOGD("AudioOutputStreamOpenSLES(): requestStop()");
+    std::lock_guard<std::mutex> lock(mLock);
     StreamState initialState = getState();
-    if (initialState == StreamState::Closed) return Result::ErrorClosed;
+    switch (initialState) {
+        case StreamState::Stopping:
+        case StreamState::Stopped:
+            return Result::OK;
+        case StreamState::Closed:
+            return Result::ErrorClosed;
+        default:
+            break;
+    }
 
     setState(StreamState::Stopping);
 
@@ -352,8 +391,8 @@
     } else {
         setState(initialState);
     }
+    LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
     return result;
-
 }
 
 void AudioOutputStreamOpenSLES::setFramesRead(int64_t framesRead) {
diff --git a/src/opensles/AudioStreamOpenSLES.cpp b/src/opensles/AudioStreamOpenSLES.cpp
index 93dceb7..61d662e 100644
--- a/src/opensles/AudioStreamOpenSLES.cpp
+++ b/src/opensles/AudioStreamOpenSLES.cpp
@@ -243,13 +243,11 @@
     return (*bq)->Enqueue(bq, mCallbackBuffer, mBytesPerCallback);
 }
 
-SLresult AudioStreamOpenSLES::processBufferCallback(SLAndroidSimpleBufferQueueItf bq) {
+void AudioStreamOpenSLES::processBufferCallback(SLAndroidSimpleBufferQueueItf bq) {
+    bool stopStream = false;
     // Ask the callback to fill the output buffer with data.
-    DataCallbackResult result = fireCallback(mCallbackBuffer, mFramesPerCallback);
-    if (result != DataCallbackResult::Continue) {
-        LOGE("Oboe callback returned %d", result);
-        return SL_RESULT_INTERNAL_ERROR; // TODO How should we stop OpenSL ES.
-    } else {
+    DataCallbackResult result = fireDataCallback(mCallbackBuffer, mFramesPerCallback);
+    if (result == DataCallbackResult::Continue) {
         // Update Oboe service position based on OpenSL ES position.
         updateServiceFrameCounter();
         // Update Oboe client position with frames handled by the callback.
@@ -259,7 +257,21 @@
             mFramesWritten += mFramesPerCallback;
         }
         // Pass the data to OpenSLES.
-        return enqueueCallbackBuffer(bq);
+        SLresult result = enqueueCallbackBuffer(bq);
+        if (result != SL_RESULT_SUCCESS) {
+            LOGE("enqueueCallbackBuffer %d", result);
+            stopStream = true;
+        }
+    } else if (result == DataCallbackResult::Stop) {
+        LOGD("Oboe callback returned Stop");
+        stopStream = true;
+    } else {
+        LOGW("Oboe callback returned unexpected value = %d", result);
+        stopStream = true;
+    }
+    if (stopStream) {
+        requestStop();
+        // launchStopThread();
     }
 }
 
diff --git a/src/opensles/AudioStreamOpenSLES.h b/src/opensles/AudioStreamOpenSLES.h
index c4c42bb..d0a6e4f 100644
--- a/src/opensles/AudioStreamOpenSLES.h
+++ b/src/opensles/AudioStreamOpenSLES.h
@@ -70,13 +70,12 @@
      *
      * This is public, but don't call it directly.
      */
-    SLresult processBufferCallback(SLAndroidSimpleBufferQueueItf bq);
+    void processBufferCallback(SLAndroidSimpleBufferQueueItf bq);
 
     Result waitForStateChange(StreamState currentState,
                               StreamState *nextState,
                               int64_t timeoutNanoseconds) override;
 
-
 protected:
 
     SLuint32 channelCountToChannelMaskDefault(int channelCount);