blob: 8a3f1ecc58e21196caea55460e706d407b72bb08 [file] [log] [blame]
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/gcp/environment_autodetect.h"
#include <memory>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpcpp/impl/grpc_library.h>
#include "src/core/ext/gcp/metadata_query.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/slice/slice.h"
namespace grpc {
namespace internal {
namespace {
grpc_core::TraceFlag grpc_environment_autodetect_trace(
false, "environment_autodetect");
// This is not a definite method to get the namespace name for GKE, but it is
// the best we have.
std::string GetNamespaceName() {
// Read the root file.
const char* filename =
"/var/run/secrets/kubernetes.io/serviceaccount/namespace";
auto namespace_name = grpc_core::LoadFile(filename, false);
if (!namespace_name.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_environment_autodetect_trace)) {
gpr_log(GPR_DEBUG, "Reading file %s failed: %s", filename,
grpc_core::StatusToString(namespace_name.status()).c_str());
}
// Fallback on an environment variable
return grpc_core::GetEnv("NAMESPACE_NAME").value_or("");
}
return std::string(reinterpret_cast<const char*>((*namespace_name).begin()),
(*namespace_name).length());
}
// Get pod name for GKE
std::string GetPodName() {
auto pod_name = grpc_core::GetEnv("POD_NAME");
if (pod_name.has_value()) {
return pod_name.value();
}
return grpc_core::GetEnv("HOSTNAME").value_or("");
}
// Get container name for GKE
std::string GetContainerName() {
return grpc_core::GetEnv("HOSTNAME").value_or("");
}
// Get function name for Cloud Functions
std::string GetFunctionName() {
auto k_service = grpc_core::GetEnv("K_SERVICE");
if (k_service.has_value()) {
return k_service.value();
}
return grpc_core::GetEnv("FUNCTION_NAME").value_or("");
}
// Get revision name for Cloud run
std::string GetRevisionName() {
return grpc_core::GetEnv("K_REVISION").value_or("");
}
// Get service name for Cloud run
std::string GetServiceName() {
return grpc_core::GetEnv("K_SERVICE").value_or("");
}
// Get configuration name for Cloud run
std::string GetConfiguratioName() {
return grpc_core::GetEnv("K_CONFIGURATION").value_or("");
}
// Get module ID for App Engine
std::string GetModuleId() {
return grpc_core::GetEnv("GAE_SERVICE").value_or("");
}
// Get version ID for App Engine
std::string GetVersionId() {
return grpc_core::GetEnv("GAE_VERSION").value_or("");
}
// Fire and forget class
class EnvironmentAutoDetectHelper
: public grpc_core::InternallyRefCounted<EnvironmentAutoDetectHelper>,
private internal::GrpcLibrary {
public:
EnvironmentAutoDetectHelper(
std::string project_id,
absl::AnyInvocable<void(EnvironmentAutoDetect::ResourceType)> on_done,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: InternallyRefCounted(/*trace=*/nullptr, /*initial_refcount=*/2),
project_id_(std::move(project_id)),
on_done_(std::move(on_done)),
event_engine_(std::move(event_engine)) {
grpc_core::ExecCtx exec_ctx;
// TODO(yashykt): The pollset stuff should go away once the HTTP library is
// ported over to use EventEngine.
pollset_ = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(pollset_, &mu_poll_);
pollent_ = grpc_polling_entity_create_from_pollset(pollset_);
// TODO(yashykt): Note that using EventEngine::Run is not fork-safe. If we
// want to make this fork-safe, we might need some re-work here.
event_engine_->Run([this] { PollLoop(); });
AutoDetect();
}
~EnvironmentAutoDetectHelper() override {
grpc_core::ExecCtx exec_ctx;
grpc_pollset_shutdown(
pollset_, GRPC_CLOSURE_CREATE(
[](void* arg, absl::Status /* status */) {
grpc_pollset_destroy(static_cast<grpc_pollset*>(arg));
gpr_free(arg);
},
pollset_, nullptr));
}
void Orphan() override {
grpc_core::Crash("Illegal Orphan() call on EnvironmentAutoDetectHelper.");
}
private:
struct Attribute {
std::string resource_attribute;
std::string metadata_server_atttribute;
};
void PollLoop() {
grpc_core::ExecCtx exec_ctx;
bool done = false;
gpr_mu_lock(mu_poll_);
grpc_pollset_worker* worker = nullptr;
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(grpc_polling_entity_pollset(&pollent_), &worker,
grpc_core::Timestamp::InfPast()))) {
notify_poller_ = true;
}
done = notify_poller_;
gpr_mu_unlock(mu_poll_);
if (!done) {
event_engine_->RunAfter(grpc_core::Duration::Milliseconds(100),
[this] { PollLoop(); });
} else {
Unref();
}
}
void AutoDetect() {
grpc_core::MutexLock lock(&mu_);
// GKE
resource_.labels.emplace("project_id", project_id_);
if (grpc_core::GetEnv("KUBERNETES_SERVICE_HOST").has_value()) {
resource_.resource_type = "k8s_container";
resource_.labels.emplace("namespace_name", GetNamespaceName());
resource_.labels.emplace("pod_name", GetPodName());
resource_.labels.emplace("container_name", GetContainerName());
attributes_to_fetch_.emplace(grpc_core::MetadataQuery::kZoneAttribute,
"location");
attributes_to_fetch_.emplace(
grpc_core::MetadataQuery::kClusterNameAttribute, "cluster_name");
}
// Cloud Functions
else if (grpc_core::GetEnv("FUNCTION_NAME").has_value() ||
grpc_core::GetEnv("FUNCTION_TARGET").has_value()) {
resource_.resource_type = "cloud_function";
resource_.labels.emplace("function_name", GetFunctionName());
attributes_to_fetch_.emplace(grpc_core::MetadataQuery::kRegionAttribute,
"region");
}
// Cloud Run
else if (grpc_core::GetEnv("K_CONFIGURATION").has_value()) {
resource_.resource_type = "cloud_run_revision";
resource_.labels.emplace("revision_name", GetRevisionName());
resource_.labels.emplace("service_name", GetServiceName());
resource_.labels.emplace("configuration_name", GetConfiguratioName());
attributes_to_fetch_.emplace(grpc_core::MetadataQuery::kRegionAttribute,
"location");
}
// App Engine
else if (grpc_core::GetEnv("GAE_SERVICE").has_value()) {
resource_.resource_type = "gae_app";
resource_.labels.emplace("module_id", GetModuleId());
resource_.labels.emplace("version_id", GetVersionId());
attributes_to_fetch_.emplace(grpc_core::MetadataQuery::kZoneAttribute,
"zone");
}
// Assume GCE
else {
assuming_gce_ = true;
resource_.resource_type = "gce_instance";
attributes_to_fetch_.emplace(
grpc_core::MetadataQuery::kInstanceIdAttribute, "instance_id");
attributes_to_fetch_.emplace(grpc_core::MetadataQuery::kZoneAttribute,
"zone");
}
FetchMetadataServerAttributesAsynchronouslyLocked();
}
void FetchMetadataServerAttributesAsynchronouslyLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_ASSERT(!attributes_to_fetch_.empty());
for (auto& element : attributes_to_fetch_) {
queries_.push_back(grpc_core::MakeOrphanable<grpc_core::MetadataQuery>(
element.first, &pollent_,
[this](std::string attribute, absl::StatusOr<std::string> result) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_environment_autodetect_trace)) {
gpr_log(
GPR_INFO,
"Environment AutoDetect: Attribute: \"%s\" Result: \"%s\"",
attribute.c_str(),
result.ok()
? result.value().c_str()
: grpc_core::StatusToString(result.status()).c_str());
}
absl::optional<EnvironmentAutoDetect::ResourceType> resource;
{
grpc_core::MutexLock lock(&mu_);
auto it = attributes_to_fetch_.find(attribute);
if (it != attributes_to_fetch_.end()) {
if (result.ok()) {
resource_.labels.emplace(std::move(it->second),
std::move(result).value());
}
// If fetching from the MetadataServer failed and we were
// assuming a GCE environment, fallback to "global".
else if (assuming_gce_) {
if (GRPC_TRACE_FLAG_ENABLED(
grpc_environment_autodetect_trace)) {
gpr_log(GPR_INFO,
"Environment Autodetect: Falling back to global "
"resource type");
}
assuming_gce_ = false;
resource_.resource_type = "global";
}
attributes_to_fetch_.erase(it);
} else {
// This should not happen
gpr_log(GPR_ERROR,
"An unexpected attribute was seen from the "
"MetadataServer: %s",
attribute.c_str());
}
if (attributes_to_fetch_.empty()) {
resource = std::move(resource_);
}
}
if (resource.has_value()) {
gpr_mu_lock(mu_poll_);
notify_poller_ = true;
gpr_mu_unlock(mu_poll_);
auto on_done = std::move(on_done_);
Unref();
on_done(std::move(resource).value());
}
},
grpc_core::Duration::Seconds(10)));
}
}
const std::string project_id_;
grpc_pollset* pollset_ = nullptr;
grpc_polling_entity pollent_;
gpr_mu* mu_poll_ = nullptr;
absl::AnyInvocable<void(EnvironmentAutoDetect::ResourceType)> on_done_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
grpc_core::Mutex mu_;
bool notify_poller_ = false;
absl::flat_hash_map<std::string /* metadata_server_attribute */,
std::string /* resource_attribute */>
attributes_to_fetch_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_core::OrphanablePtr<grpc_core::MetadataQuery>> queries_
ABSL_GUARDED_BY(mu_);
EnvironmentAutoDetect::ResourceType resource_ ABSL_GUARDED_BY(mu_);
// This would be true if we are assuming the resource to be GCE. In this case,
// there is a chance that it will fail and we should instead just use
// "global".
bool assuming_gce_ ABSL_GUARDED_BY(mu_) = false;
};
EnvironmentAutoDetect* g_autodetect = nullptr;
} // namespace
void EnvironmentAutoDetect::Create(std::string project_id) {
GPR_ASSERT(g_autodetect == nullptr && !project_id.empty());
g_autodetect = new EnvironmentAutoDetect(project_id);
}
EnvironmentAutoDetect& EnvironmentAutoDetect::Get() { return *g_autodetect; }
EnvironmentAutoDetect::EnvironmentAutoDetect(std::string project_id)
: project_id_(std::move(project_id)) {
GPR_ASSERT(!project_id_.empty());
}
void EnvironmentAutoDetect::NotifyOnDone(absl::AnyInvocable<void()> callback) {
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
{
grpc_core::MutexLock lock(&mu_);
// Environment has already been detected
if (resource_ != nullptr) {
// Execute on the event engine to avoid deadlocks.
return event_engine_->Run(std::move(callback));
}
callbacks_.push_back(std::move(callback));
// Use the event_engine_ pointer as a signal to judge whether we've started
// detecting the environment.
if (event_engine_ == nullptr) {
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
event_engine = event_engine_;
}
}
if (event_engine) {
new EnvironmentAutoDetectHelper(
project_id_,
[this](EnvironmentAutoDetect::ResourceType resource) {
std::vector<absl::AnyInvocable<void()>> callbacks;
{
grpc_core::MutexLock lock(&mu_);
resource_ = std::make_unique<EnvironmentAutoDetect::ResourceType>(
std::move(resource));
callbacks = std::move(callbacks_);
}
for (auto& callback : callbacks) {
callback();
}
},
std::move(event_engine));
}
}
} // namespace internal
} // namespace grpc