blob: 5445906a0b09bb7afd760dc1810700658276b5d6 [file] [log] [blame] [edit]
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "lights_observer.h"
#include <android-base/logging.h>
#include <chrono>
#include "common/libs/utils/vsock_connection.h"
#include <json/json.h>
namespace cuttlefish {
namespace webrtc_streaming {
LightsObserver::LightsObserver(unsigned int port, unsigned int cid,
bool vhost_user_vsock)
: cid_(cid),
port_(port),
vhost_user_vsock_(vhost_user_vsock),
is_running_(false),
session_active_(false),
last_client_channel_id_(-1) {}
LightsObserver::~LightsObserver() { Stop(); }
bool LightsObserver::Start() {
if (connection_thread_.joinable()) {
LOG(ERROR) << "Connection thread is already running.";
return false;
}
is_running_ = true;
connection_thread_ = std::thread([this] {
while (is_running_) {
while (cvd_connection_.IsConnected()) {
ReadServerMessages();
}
// Try to start a new connection. If this fails, delay retrying a bit.
if (is_running_ &&
!cvd_connection_.Connect(
port_, cid_,
vhost_user_vsock_
? std::optional(0) /* any value is okay for client */
: std::nullopt)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
continue;
}
}
LOG(INFO) << "Exiting connection thread";
});
LOG(INFO) << "Connection thread running";
return true;
}
void LightsObserver::Stop() {
is_running_ = false;
cvd_connection_.Disconnect();
// The connection_thread_ should finish at any point now. Let's join it.
if (connection_thread_.joinable()) {
connection_thread_.join();
}
}
void LightsObserver::ReadServerMessages() {
static constexpr auto kEventKey = "event";
static constexpr auto kMessageStart = "VIRTUAL_DEVICE_START_LIGHTS_SESSION";
static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION";
static constexpr auto kMessageUpdate = "VIRTUAL_DEVICE_LIGHTS_UPDATE";
auto json_value = cvd_connection_.ReadJsonMessage();
if (json_value[kEventKey] == kMessageStart) {
session_active_ = true;
} else if (json_value[kEventKey] == kMessageStop) {
session_active_ = false;
} else if (json_value[kEventKey] == kMessageUpdate && session_active_) {
// Cache the latest update for when new clients register
std::lock_guard<std::mutex> lock(clients_lock_);
cached_latest_update_ = json_value;
// Send update to all subscribed clients
for (auto itr = client_message_senders_.begin();
itr != client_message_senders_.end(); itr++) {
itr->second(json_value);
}
}
}
int LightsObserver::Subscribe(
std::function<bool(const Json::Value&)> lights_message_sender) {
int client_id = -1;
{
std::lock_guard<std::mutex> lock(clients_lock_);
client_id = ++last_client_channel_id_;
client_message_senders_[client_id] = lights_message_sender;
if (!cached_latest_update_.empty()) {
lights_message_sender(cached_latest_update_);
}
}
return client_id;
}
void LightsObserver::Unsubscribe(int id) {
std::lock_guard<std::mutex> lock(clients_lock_);
client_message_senders_.erase(id);
}
} // namespace webrtc_streaming
} // namespace cuttlefish