| # Copyright 2024 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from typing import Dict, List |
| |
| from opentelemetry.sdk.metrics.export import AggregationTemporality |
| from opentelemetry.sdk.metrics.export import MetricExportResult |
| from opentelemetry.sdk.metrics.export import MetricExporter |
| from opentelemetry.sdk.metrics.export import MetricsData |
| |
| |
| class OTelMetricExporter(MetricExporter): |
| """Implementation of :class:`MetricExporter` that export metrics to the |
| provided metric_list. |
| |
| all_metrics: A dict whose keys are grpc_observability._opentelemetry_measures.Metric.name, |
| value is a list of labels recorded for that metric. |
| An example item of this dict: |
| {"grpc.client.attempt.started": |
| [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, |
| {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} |
| """ |
| |
| def __init__( |
| self, |
| all_metrics: Dict[str, List], |
| preferred_temporality: Dict[type, AggregationTemporality] = None, |
| preferred_aggregation: Dict[ |
| type, "opentelemetry.sdk.metrics.view.Aggregation" |
| ] = None, |
| print_live: bool = False, |
| ): |
| super().__init__( |
| preferred_temporality=preferred_temporality, |
| preferred_aggregation=preferred_aggregation, |
| ) |
| self._all_metrics = all_metrics |
| self._print_live = print_live |
| |
| def export( |
| self, |
| metrics_data: MetricsData, |
| timeout_millis: float = 10_000, |
| **kwargs, |
| ) -> MetricExportResult: |
| self.record_metric(metrics_data) |
| return MetricExportResult.SUCCESS |
| |
| def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: |
| pass |
| |
| def force_flush(self, timeout_millis: float = 10_000) -> bool: |
| return True |
| |
| def record_metric(self, metrics_data: MetricsData) -> None: |
| for resource_metric in metrics_data.resource_metrics: |
| for scope_metric in resource_metric.scope_metrics: |
| for metric in scope_metric.metrics: |
| for data_point in metric.data.data_points: |
| self._all_metrics[metric.name].append( |
| data_point.attributes |
| ) |
| if self._print_live: |
| print(f"Metric exporter received: {metric.name}") |