| // Copyright 2014 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #pragma once |
| |
| #include "aemu/base/Optional.h" |
| #include "aemu/base/synchronization/AndroidConditionVariable.h" |
| #include "aemu/base/synchronization/AndroidLock.h" |
| |
| #include <utility> |
| #include <stddef.h> |
| |
| namespace gfxstream { |
| namespace guest { |
| |
| // Base non-templated class used to reduce the amount of template |
| // specialization. |
| class MessageChannelBase { |
| public: |
| // Get the current channel size |
| size_t size() const; |
| |
| // Abort the currently pending operations and don't allow any other ones |
| void stop(); |
| |
| // Check if the channel is stopped. |
| bool isStopped() const; |
| |
| // Block until the channel has no pending messages. |
| void waitForEmpty(); |
| |
| protected: |
| // Constructor. |capacity| is the buffer capacity in messages. |
| MessageChannelBase(size_t capacity); |
| |
| // Destructor. |
| ~MessageChannelBase() = default; |
| |
| // Call this method in the sender thread before writing a new message. |
| // This returns the position of the available slot in the message array |
| // where to copy the new fixed-size message. After the copy, call |
| // afterWrite(). |
| // If the channel is stopped, return value is undefined. |
| size_t beforeWrite(); |
| |
| // Same as beforeWrite(), but returns an empty optional if there was |
| // no room to write to instead of waiting for it. |
| // One still needs to call afterWrite() anyway. |
| Optional<size_t> beforeTryWrite(); |
| |
| // To be called after trying to write a new fixed-size message (which should |
| // happen after beforeWrite() or beforeTryWrite()). |
| // |success| must be true to indicate that a new item was added to the |
| // channel, or false otherwise (i.e. if the channel is stopped, or if |
| // beforeTryWrite() returned an empty optional). |
| void afterWrite(bool success); |
| |
| // Call this method in the receiver thread before reading a new message. |
| // This returns the position in the message array where the new message |
| // can be read. Caller must process the message, then call afterRead(). |
| // If the channel is stopped, return value is undefined. |
| size_t beforeRead(); |
| |
| // Same as beforeRead(), but returns an empty optional if there was |
| // no data to read instead of waiting for it. |
| // One still needs to call afterWrite() anyway. |
| Optional<size_t> beforeTryRead(); |
| |
| // Same as beforeRead(), but returns an empty optional if no data arrived |
| // by the |wallTimeUs| absolute time. One still needs to call |
| // afterWrite() anyway. |
| Optional<size_t> beforeTimedRead(uint64_t wallTimeUs); |
| |
| // To be called after reading a fixed-size message from the channel (which |
| // must happen after beforeRead() or beforeTryRead()). |
| // |success| must be true to indicate that a message was read, or false |
| // otherwise (i.e. if the channel is stopped or if beforeTryRead() returned |
| // an empty optional). |
| void afterRead(bool success); |
| |
| // A version of isStopped() that doesn't lock the channel but expects it |
| // to be locked by the caller. |
| bool isStoppedLocked() const { return mStopped; } |
| |
| private: |
| size_t mPos = 0; |
| size_t mCapacity; |
| size_t mCount = 0; |
| bool mStopped = false; |
| mutable Lock mLock; // Mutable to allow const members to lock it. |
| ConditionVariable mCanRead; |
| ConditionVariable mCanWrite; |
| }; |
| |
| // Helper class used to implement an uni-directional IPC channel between |
| // two threads. The channel can be used to send fixed-size messages of type |
| // |T|, with an internal buffer size of |CAPACITY| items. All calls are |
| // blocking. |
| // |
| // Usage is pretty straightforward: |
| // |
| // - From the sender thread, call send(msg); |
| // - From the receiver thread, call receive(&msg); |
| // - If you want to stop the IPC, call stop(); |
| template <typename T, size_t CAPACITY> |
| class MessageChannel : public MessageChannelBase { |
| public: |
| MessageChannel() : MessageChannelBase(CAPACITY) {} |
| |
| bool send(const T& msg) { |
| const size_t pos = beforeWrite(); |
| const bool res = !isStoppedLocked(); |
| if (res) { |
| mItems[pos] = msg; |
| } |
| afterWrite(res); |
| return res; |
| } |
| |
| bool send(T&& msg) { |
| const size_t pos = beforeWrite(); |
| const bool res = !isStoppedLocked(); |
| if (res) { |
| mItems[pos] = std::move(msg); |
| } |
| afterWrite(res); |
| return res; |
| } |
| |
| bool trySend(const T& msg) { |
| const auto pos = beforeTryWrite(); |
| if (pos) { |
| mItems[*pos] = msg; |
| } |
| afterWrite(pos); |
| return pos; |
| } |
| |
| bool trySend(T&& msg) { |
| const auto pos = beforeTryWrite(); |
| if (pos) { |
| mItems[*pos] = std::move(msg); |
| } |
| afterWrite(pos); |
| return pos; |
| } |
| |
| bool receive(T* msg) { |
| const size_t pos = beforeRead(); |
| const bool res = !isStoppedLocked(); |
| if (res) { |
| *msg = std::move(mItems[pos]); |
| } |
| afterRead(res); |
| return res; |
| } |
| |
| Optional<T> receive() { |
| const size_t pos = beforeRead(); |
| if (!isStoppedLocked()) { |
| Optional<T> msg(std::move(mItems[pos])); |
| afterRead(true); |
| return msg; |
| } else { |
| afterRead(false); |
| return {}; |
| } |
| } |
| |
| bool tryReceive(T* msg) { |
| const auto pos = beforeTryRead(); |
| if (pos) { |
| *msg = std::move(mItems[*pos]); |
| } |
| afterRead(pos); |
| return pos; |
| } |
| |
| Optional<T> timedReceive(uint64_t wallTimeUs) { |
| const auto pos = beforeTimedRead(wallTimeUs); |
| if (pos && !isStoppedLocked()) { |
| Optional<T> res(std::move(mItems[*pos])); |
| afterRead(true); |
| return res; |
| } |
| afterRead(false); |
| return {}; |
| } |
| |
| constexpr size_t capacity() const { return CAPACITY; } |
| |
| private: |
| T mItems[CAPACITY]; |
| }; |
| |
| } // namespace guest |
| } // namespace gfxstream |