oboe: stop stream if callback requests it
Stop in a separate thread for AAudio before Q.
Call stop directly for OpenSL ES.
Possible fix for #277
diff --git a/include/oboe/AudioStream.h b/include/oboe/AudioStream.h
index 5898b9f..01748d1 100644
--- a/include/oboe/AudioStream.h
+++ b/include/oboe/AudioStream.h
@@ -371,6 +371,11 @@
return nullptr;
}
+ /**
+ * Launch a thread that will stop the stream.
+ */
+ void launchStopThread();
+
protected:
/**
@@ -402,7 +407,7 @@
* @return the result of the callback: stop or continue
*
*/
- DataCallbackResult fireCallback(void *audioData, int numFrames);
+ DataCallbackResult fireDataCallback(void *audioData, int numFrames);
/**
* Update mFramesWritten.
@@ -415,7 +420,22 @@
virtual void updateFramesRead() = 0;
/**
- * Number of frames which have been written into the stream.
+ * @return true if callbacks may be called
+ */
+ bool isDataCallbackEnabled() {
+ return mDataCallbackEnabled;
+ }
+
+ /**
+ * This can be set false internally to prevent callbacks
+ * after DataCallbackResult::Stop has been returned.
+ */
+ void setDataCallbackEnabled(bool enabled) {
+ mDataCallbackEnabled = enabled;
+ }
+
+ /**
+ * Number of frames which have been written into the stream
*
* This is signed integer to match the counters in AAudio.
* At audio rates, the counter will overflow in about six million years.
@@ -430,9 +450,13 @@
*/
std::atomic<int64_t> mFramesRead{};
+ std::mutex mLock; // for synchronizing start/stop/close
+
private:
int mPreviousScheduler = -1;
+ std::atomic<bool> mDataCallbackEnabled{};
+
};
} // namespace oboe
diff --git a/src/aaudio/AAudioLoader.cpp b/src/aaudio/AAudioLoader.cpp
index 9a863c2..326e2e4 100644
--- a/src/aaudio/AAudioLoader.cpp
+++ b/src/aaudio/AAudioLoader.cpp
@@ -108,13 +108,15 @@
int64_t timeoutNanoseconds)>
(dlsym(mLibHandle, "AAudioStream_waitForStateChange"));
-
stream_getTimestamp = reinterpret_cast<aaudio_result_t (*)(AAudioStream *stream,
- clockid_t clockid,
- int64_t *framePosition,
- int64_t *timeNanoseconds)>
+ clockid_t clockid,
+ int64_t *framePosition,
+ int64_t *timeNanoseconds)>
(dlsym(mLibHandle, "AAudioStream_getTimestamp"));
+ stream_isMMapUsed = reinterpret_cast<bool (*)(AAudioStream *stream)>
+ (dlsym(mLibHandle, "AAudioStream_isMMapUsed"));
+
stream_getChannelCount = load_I_PS("AAudioStream_getChannelCount");
if (stream_getChannelCount == nullptr) {
// Use old alias if needed.
diff --git a/src/aaudio/AAudioLoader.h b/src/aaudio/AAudioLoader.h
index f39926e..9f8cee3 100644
--- a/src/aaudio/AAudioLoader.h
+++ b/src/aaudio/AAudioLoader.h
@@ -113,6 +113,9 @@
int64_t *framePosition,
int64_t *timeNanoseconds);
+
+ bool (*stream_isMMapUsed)(AAudioStream *stream) = nullptr;
+
signature_I_PS stream_close = nullptr;
signature_I_PS stream_getChannelCount = nullptr;
diff --git a/src/aaudio/AudioStreamAAudio.cpp b/src/aaudio/AudioStreamAAudio.cpp
index e2c3bf8..018e0bf 100644
--- a/src/aaudio/AudioStreamAAudio.cpp
+++ b/src/aaudio/AudioStreamAAudio.cpp
@@ -38,9 +38,10 @@
int32_t numFrames) {
AudioStreamAAudio *oboeStream = reinterpret_cast<AudioStreamAAudio*>(userData);
- if (oboeStream != NULL) {
+ if (oboeStream != nullptr) {
return static_cast<aaudio_data_callback_result_t>(
oboeStream->callOnAudioReady(stream, audioData, numFrames));
+
} else {
return static_cast<aaudio_data_callback_result_t>(DataCallbackResult::Stop);
}
@@ -49,7 +50,7 @@
static void oboe_aaudio_error_thread_proc(AudioStreamAAudio *oboeStream,
AAudioStream *stream,
Result error) {
- if (oboeStream != NULL) {
+ if (oboeStream != nullptr) {
oboeStream->onErrorInThread(stream, error);
}
}
@@ -231,10 +232,28 @@
DataCallbackResult AudioStreamAAudio::callOnAudioReady(AAudioStream *stream,
void *audioData,
int32_t numFrames) {
- return mStreamCallback->onAudioReady(
- this,
- audioData,
- numFrames);
+ DataCallbackResult result = fireDataCallback(audioData, numFrames);
+ if (result == DataCallbackResult::Continue) {
+ return result;
+ } else {
+ if (result == DataCallbackResult::Stop) {
+ LOGE("Oboe callback returned DataCallbackResult::Stop");
+ } else {
+ LOGE("Oboe callback returned unexpected value = %d", result);
+ }
+
+ if (getSdkVersion() <= __ANDROID_API_P__) {
+ launchStopThread();
+ if (isMMapUsed()) {
+ return DataCallbackResult::Stop;
+ } else {
+ // Legacy stream <= API_P cannot be restarted after returning Stop.
+ return DataCallbackResult::Continue;
+ }
+ } else {
+ return DataCallbackResult::Stop; // OK >= API_Q
+ }
+ }
}
void AudioStreamAAudio::onErrorInThread(AAudioStream *stream, Result error) {
@@ -263,6 +282,7 @@
return Result::OK;
}
}
+ setDataCallbackEnabled(true);
return static_cast<Result>(mLibLoader->stream_requestStart(stream));
} else {
return Result::ErrorClosed;
@@ -508,4 +528,13 @@
return ResultWithValue<double>(latencyMillis);
}
+bool AudioStreamAAudio::isMMapUsed() {
+ AAudioStream *stream = mAAudioStream.load();
+ if (stream != nullptr) {
+ return mLibLoader->stream_isMMapUsed(stream);
+ } else {
+ return false;
+ }
+}
+
} // namespace oboe
diff --git a/src/aaudio/AudioStreamAAudio.h b/src/aaudio/AudioStreamAAudio.h
index ccabe05..8316d62 100644
--- a/src/aaudio/AudioStreamAAudio.h
+++ b/src/aaudio/AudioStreamAAudio.h
@@ -109,9 +109,10 @@
private:
+ bool isMMapUsed();
+
std::atomic<bool> mCallbackThreadEnabled;
- std::mutex mLock; // for synchronizing start/stop/close
std::atomic<AAudioStream *> mAAudioStream{nullptr};
static AAudioLoader *mLibLoader;
diff --git a/src/common/AudioStream.cpp b/src/common/AudioStream.cpp
index 966bc36..5dfafb1 100644
--- a/src/common/AudioStream.cpp
+++ b/src/common/AudioStream.cpp
@@ -16,6 +16,7 @@
#include <sys/types.h>
#include <pthread.h>
+#include <thread>
#include <oboe/AudioStream.h>
#include "OboeDebug.h"
#include <oboe/Utilities.h>
@@ -41,10 +42,15 @@
return Result::OK;
}
-DataCallbackResult AudioStream::fireCallback(void *audioData, int32_t numFrames) {
+DataCallbackResult AudioStream::fireDataCallback(void *audioData, int32_t numFrames) {
+ if (!isDataCallbackEnabled()) {
+ LOGW("AudioStream::%s() called with data callback disabled!", __func__);
+ return DataCallbackResult::Stop; // We should not be getting called any more.
+ }
+
int scheduler = sched_getscheduler(0) & ~SCHED_RESET_ON_FORK; // for current thread
if (scheduler != mPreviousScheduler) {
- LOGD("AudioStream::fireCallback() scheduler = %s",
+ LOGD("AudioStream::%s() scheduler = %s", __func__,
((scheduler == SCHED_FIFO) ? "SCHED_FIFO" :
((scheduler == SCHED_OTHER) ? "SCHED_OTHER" :
((scheduler == SCHED_RR) ? "SCHED_RR" : "UNKNOWN")))
@@ -58,6 +64,9 @@
} else {
result = mStreamCallback->onAudioReady(this, audioData, numFrames);
}
+ // On Oreo, we might get called after returning stop.
+ // So block there here.
+ setDataCallbackEnabled(result == DataCallbackResult::Continue);
return result;
}
@@ -131,4 +140,19 @@
return mFramesWritten;
}
+static void oboe_stop_thread_proc(AudioStream *oboeStream) {
+ LOGD("%s() called ----)))))", __func__);
+ if (oboeStream != nullptr) {
+ oboeStream->requestStop();
+ }
+ LOGD("%s() returning (((((----", __func__);
+}
+
+void AudioStream::launchStopThread() {
+ // Stop this stream on a separate thread
+ // std::thread t(requestStop);
+ std::thread t(oboe_stop_thread_proc, this);
+ t.detach();
+}
+
} // namespace oboe
diff --git a/src/opensles/AudioInputStreamOpenSLES.cpp b/src/opensles/AudioInputStreamOpenSLES.cpp
index 1606da3..841e28b 100644
--- a/src/opensles/AudioInputStreamOpenSLES.cpp
+++ b/src/opensles/AudioInputStreamOpenSLES.cpp
@@ -206,18 +206,24 @@
}
Result AudioInputStreamOpenSLES::close() {
-
+ LOGD("AudioInputStreamOpenSLES::%s()", __func__);
+ mLock.lock();
+ Result result = Result::OK;
if (mState == StreamState::Closed){
- return Result::ErrorClosed;
+ result = Result::ErrorClosed;
} else {
+ mLock.unlock(); // avoid recursive lock
requestStop();
+ mLock.lock();
+ // invalidate any interfaces
mRecordInterface = NULL;
- return AudioStreamOpenSLES::close();
+ result = AudioStreamOpenSLES::close();
}
+ mLock.unlock(); // avoid recursive lock
+ return result;
}
Result AudioInputStreamOpenSLES::setRecordState(SLuint32 newState) {
-
LOGD("AudioInputStreamOpenSLES::setRecordState(%d)", newState);
Result result = Result::OK;
@@ -234,41 +240,64 @@
}
Result AudioInputStreamOpenSLES::requestStart() {
-
- LOGD("AudioInputStreamOpenSLES::requestStart()");
+ LOGD("AudioInputStreamOpenSLES(): %s() called", __func__);
+ mLock.lock();
StreamState initialState = getState();
- if (initialState == StreamState::Closed) return Result::ErrorClosed;
+ switch (initialState) {
+ case StreamState::Starting:
+ case StreamState::Started:
+ mLock.unlock();
+ return Result::OK;
+ case StreamState::Closed:
+ mLock.unlock();
+ return Result::ErrorClosed;
+ default:
+ break;
+ }
+ setDataCallbackEnabled(true);
setState(StreamState::Starting);
Result result = setRecordState(SL_RECORDSTATE_RECORDING);
if (result == Result::OK) {
// Enqueue the first buffer so that we have data ready in the callback.
setState(StreamState::Started);
+ mLock.unlock();
enqueueCallbackBuffer(mSimpleBufferQueueInterface);
+ mLock.lock();
} else {
setState(initialState);
}
+ mLock.unlock();
return result;
}
Result AudioInputStreamOpenSLES::requestPause() {
- LOGW("AudioInputStreamOpenSLES::requestPause() is intentionally not implemented for input "
- "streams");
+ LOGW("AudioInputStreamOpenSLES::%s() is intentionally not implemented for input "
+ "streams", __func__);
return Result::ErrorUnimplemented; // Matches AAudio behavior.
}
Result AudioInputStreamOpenSLES::requestFlush() {
- LOGW("AudioInputStreamOpenSLES::requestFlush() is intentionally not implemented for input "
- "streams");
+ LOGW("AudioInputStreamOpenSLES::%s() is intentionally not implemented for input "
+ "streams", __func__);
return Result::ErrorUnimplemented; // Matches AAudio behavior.
}
Result AudioInputStreamOpenSLES::requestStop() {
+ LOGD("AudioInputStreamOpenSLES(): %s() called", __func__);
- LOGD("AudioInputStreamOpenSLES::requestStop()");
+ std::lock_guard<std::mutex> lock(mLock);
StreamState initialState = getState();
- if (initialState == StreamState::Closed) return Result::ErrorClosed;
+ switch (initialState) {
+ case StreamState::Stopping:
+ case StreamState::Stopped:
+ return Result::OK;
+ case StreamState::Closed:
+ return Result::ErrorClosed;
+ default:
+ break;
+ }
setState(StreamState::Stopping);
diff --git a/src/opensles/AudioOutputStreamOpenSLES.cpp b/src/opensles/AudioOutputStreamOpenSLES.cpp
index adcf2d4..ed65824 100644
--- a/src/opensles/AudioOutputStreamOpenSLES.cpp
+++ b/src/opensles/AudioOutputStreamOpenSLES.cpp
@@ -234,20 +234,24 @@
Result AudioOutputStreamOpenSLES::onAfterDestroy() {
OutputMixerOpenSL::getInstance().close();
-
return Result::OK;
}
Result AudioOutputStreamOpenSLES::close() {
-
+ mLock.lock();
+ Result result = Result::OK;
if (mState == StreamState::Closed){
- return Result::ErrorClosed;
+ result = Result::ErrorClosed;
} else {
+ mLock.unlock(); // avoid recursive lock
requestPause();
+ mLock.lock();
// invalidate any interfaces
mPlayInterface = NULL;
- return AudioStreamOpenSLES::close();
+ result = AudioStreamOpenSLES::close();
}
+ mLock.unlock(); // avoid recursive lock
+ return result;
}
Result AudioOutputStreamOpenSLES::setPlayState(SLuint32 newState) {
@@ -269,27 +273,53 @@
}
Result AudioOutputStreamOpenSLES::requestStart() {
+ LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
- LOGD("AudioOutputStreamOpenSLES(): requestStart()");
+ mLock.lock();
StreamState initialState = getState();
- if (initialState == StreamState::Closed) return Result::ErrorClosed;
+ switch (initialState) {
+ case StreamState::Starting:
+ case StreamState::Started:
+ mLock.unlock();
+ return Result::OK;
+ case StreamState::Closed:
+ mLock.unlock();
+ return Result::ErrorClosed;
+ default:
+ break;
+ }
+ setDataCallbackEnabled(true);
setState(StreamState::Starting);
Result result = setPlayState(SL_PLAYSTATE_PLAYING);
if (result == Result::OK) {
setState(StreamState::Started);
+ mLock.unlock();
+ // This could call requestStop() so try to avoid a recursive lock.
processBufferCallback(mSimpleBufferQueueInterface);
+ mLock.lock();
} else {
setState(initialState);
}
+ LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
+ mLock.unlock();
return result;
}
Result AudioOutputStreamOpenSLES::requestPause() {
+ LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
- LOGD("AudioOutputStreamOpenSLES::requestPause()");
+ std::lock_guard<std::mutex> lock(mLock);
StreamState initialState = getState();
- if (initialState == StreamState::Closed) return Result::ErrorClosed;
+ switch (initialState) {
+ case StreamState::Pausing:
+ case StreamState::Paused:
+ return Result::OK;
+ case StreamState::Closed:
+ return Result::ErrorClosed;
+ default:
+ break;
+ }
setState(StreamState::Pausing);
Result result = setPlayState(SL_PLAYSTATE_PAUSED);
@@ -303,6 +333,7 @@
} else {
setState(initialState);
}
+ LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
return result;
}
@@ -310,28 +341,36 @@
* Flush/clear the queue buffers
*/
Result AudioOutputStreamOpenSLES::requestFlush() {
-
- LOGD("AudioOutputStreamOpenSLES(): requestFlush()");
+ LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
if (getState() == StreamState::Closed) return Result::ErrorClosed;
-
+ Result result = Result::OK;
if (mPlayInterface == NULL || mSimpleBufferQueueInterface == NULL) {
- return Result::ErrorInvalidState;
+ result = Result::ErrorInvalidState;
} else {
- SLresult result = (*mSimpleBufferQueueInterface)->Clear(mSimpleBufferQueueInterface);
- if (result == SL_RESULT_SUCCESS){
- return Result::OK;
- } else {
+ SLresult slResult = (*mSimpleBufferQueueInterface)->Clear(mSimpleBufferQueueInterface);
+ if (slResult != SL_RESULT_SUCCESS){
LOGW("Failed to clear buffer queue. OpenSLES error: %d", result);
- return Result::ErrorInternal;
+ result = Result::ErrorInternal;
}
}
+ LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
+ return result;
}
Result AudioOutputStreamOpenSLES::requestStop() {
+ LOGD("AudioOutputStreamOpenSLES(): %s() called", __func__);
- LOGD("AudioOutputStreamOpenSLES(): requestStop()");
+ std::lock_guard<std::mutex> lock(mLock);
StreamState initialState = getState();
- if (initialState == StreamState::Closed) return Result::ErrorClosed;
+ switch (initialState) {
+ case StreamState::Stopping:
+ case StreamState::Stopped:
+ return Result::OK;
+ case StreamState::Closed:
+ return Result::ErrorClosed;
+ default:
+ break;
+ }
setState(StreamState::Stopping);
@@ -352,8 +391,8 @@
} else {
setState(initialState);
}
+ LOGD("AudioOutputStreamOpenSLES(): %s() returning %d", __func__, result);
return result;
-
}
void AudioOutputStreamOpenSLES::setFramesRead(int64_t framesRead) {
diff --git a/src/opensles/AudioStreamOpenSLES.cpp b/src/opensles/AudioStreamOpenSLES.cpp
index 93dceb7..61d662e 100644
--- a/src/opensles/AudioStreamOpenSLES.cpp
+++ b/src/opensles/AudioStreamOpenSLES.cpp
@@ -243,13 +243,11 @@
return (*bq)->Enqueue(bq, mCallbackBuffer, mBytesPerCallback);
}
-SLresult AudioStreamOpenSLES::processBufferCallback(SLAndroidSimpleBufferQueueItf bq) {
+void AudioStreamOpenSLES::processBufferCallback(SLAndroidSimpleBufferQueueItf bq) {
+ bool stopStream = false;
// Ask the callback to fill the output buffer with data.
- DataCallbackResult result = fireCallback(mCallbackBuffer, mFramesPerCallback);
- if (result != DataCallbackResult::Continue) {
- LOGE("Oboe callback returned %d", result);
- return SL_RESULT_INTERNAL_ERROR; // TODO How should we stop OpenSL ES.
- } else {
+ DataCallbackResult result = fireDataCallback(mCallbackBuffer, mFramesPerCallback);
+ if (result == DataCallbackResult::Continue) {
// Update Oboe service position based on OpenSL ES position.
updateServiceFrameCounter();
// Update Oboe client position with frames handled by the callback.
@@ -259,7 +257,21 @@
mFramesWritten += mFramesPerCallback;
}
// Pass the data to OpenSLES.
- return enqueueCallbackBuffer(bq);
+ SLresult result = enqueueCallbackBuffer(bq);
+ if (result != SL_RESULT_SUCCESS) {
+ LOGE("enqueueCallbackBuffer %d", result);
+ stopStream = true;
+ }
+ } else if (result == DataCallbackResult::Stop) {
+ LOGD("Oboe callback returned Stop");
+ stopStream = true;
+ } else {
+ LOGW("Oboe callback returned unexpected value = %d", result);
+ stopStream = true;
+ }
+ if (stopStream) {
+ requestStop();
+ // launchStopThread();
}
}
diff --git a/src/opensles/AudioStreamOpenSLES.h b/src/opensles/AudioStreamOpenSLES.h
index c4c42bb..d0a6e4f 100644
--- a/src/opensles/AudioStreamOpenSLES.h
+++ b/src/opensles/AudioStreamOpenSLES.h
@@ -70,13 +70,12 @@
*
* This is public, but don't call it directly.
*/
- SLresult processBufferCallback(SLAndroidSimpleBufferQueueItf bq);
+ void processBufferCallback(SLAndroidSimpleBufferQueueItf bq);
Result waitForStateChange(StreamState currentState,
StreamState *nextState,
int64_t timeoutNanoseconds) override;
-
protected:
SLuint32 channelCountToChannelMaskDefault(int channelCount);