| # Contains all of the classes that are shared across both the Canary Wrapper and the Persistent Canary Wrapper scripts |
| # If a class can/is reused, then it should be in this file. |
| |
| # Needs to be installed prior to running |
| import boto3 |
| import psutil |
| # Part of standard packages in Python 3.4+ |
| import time |
| import os |
| import json |
| import subprocess |
| import zipfile |
| import datetime |
| |
| # ================================================================================ |
| |
| # Class that holds metric data and has a few utility functions for getting that data in a format we can use for Cloudwatch |
| class DataSnapshot_Metric(): |
| def __init__(self, metric_name, metric_function, metric_dimensions=[], |
| metric_unit="None", metric_alarm_threshold=None, metric_alarm_severity=6, |
| git_hash="", git_repo_name="", reports_to_skip=0, is_percent=False): |
| self.metric_name = metric_name |
| self.metric_function = metric_function |
| self.metric_dimensions = metric_dimensions |
| self.metric_unit = metric_unit |
| self.metric_alarm_threshold = metric_alarm_threshold |
| self.metric_alarm_name = self.metric_name + "-" + git_repo_name + "-" + git_hash |
| self.metric_alarm_description = 'Alarm for metric "' + self.metric_name + '" - git hash: ' + git_hash |
| self.metric_value = None |
| self.reports_to_skip = reports_to_skip |
| self.metric_alarm_severity = metric_alarm_severity |
| self.is_percent = is_percent |
| |
| # Gets the latest metric value from the metric_function callback |
| def get_metric_value(self, psutil_process : psutil.Process): |
| if not self.metric_function is None: |
| self.metric_value = self.metric_function(psutil_process) |
| return self.metric_value |
| |
| # Returns the data needed to send to Cloudwatch when posting metrics |
| def get_metric_cloudwatch_dictionary(self): |
| if (self.reports_to_skip > 0): |
| self.reports_to_skip -= 1 |
| return None # skips sending to Cloudwatch |
| |
| if (self.metric_value == None): |
| return None # skips sending to Cloudwatch |
| |
| return { |
| "MetricName": self.metric_name, |
| "Dimensions": self.metric_dimensions, |
| "Value": self.metric_value, |
| "Unit": self.metric_unit |
| } |
| |
| class DataSnapshot_Dashboard_Widget(): |
| def __init__(self, widget_name, metric_namespace, metric_dimension, cloudwatch_region="us-east-1", widget_period=60) -> None: |
| self.metric_list = [] |
| self.region = cloudwatch_region |
| self.widget_name = widget_name |
| self.metric_namespace = metric_namespace |
| self.metric_dimension = metric_dimension |
| self.widget_period = widget_period |
| |
| def add_metric_to_widget(self, new_metric_name): |
| try: |
| self.metric_list.append(new_metric_name) |
| except Exception as e: |
| print ("[DataSnapshot_Dashboard] ERROR - could not add metric to dashboard widget due to exception!") |
| print ("[DataSnapshot_Dashboard] Exception: " + str(e)) |
| |
| def remove_metric_from_widget(self, existing_metric_name): |
| try: |
| self.metric_list.remove(existing_metric_name) |
| except Exception as e: |
| print ("[DataSnapshot_Dashboard] ERROR - could not remove metric from dashboard widget due to exception!") |
| print ("[DataSnapshot_Dashboard] Exception: " + str(e)) |
| |
| def get_widget_dictionary(self): |
| metric_list_json = [] |
| for metric_name in self.metric_list: |
| metric_list_json.append([self.metric_namespace, metric_name, self.metric_dimension, metric_name]) |
| |
| return { |
| "type":"metric", |
| "properties" : { |
| "metrics" : metric_list_json, |
| "region": self.region, |
| "title": self.widget_name, |
| "period": self.widget_period, |
| }, |
| "width": 14, |
| "height": 10 |
| } |
| |
| # ================================================================================ |
| |
| # Class that keeps track of the metrics registered, sets up Cloudwatch and S3, and sends periodic reports |
| # Is the backbone of the reporting operation |
| class DataSnapshot(): |
| def __init__(self, |
| git_hash=None, |
| git_repo_name=None, |
| git_hash_as_namespace=False, |
| git_fixed_namespace_text="mqtt5_canary", |
| datetime_string=None, |
| output_log_filepath=None, |
| output_to_console=True, |
| cloudwatch_region="us-east-1", |
| cloudwatch_make_dashboard=False, |
| cloudwatch_teardown_alarms_on_complete=True, |
| cloudwatch_teardown_dashboard_on_complete=True, |
| s3_bucket_name="canary-wrapper-bucket", |
| s3_bucket_upload_on_complete=True, |
| lambda_name="CanarySendEmailLambda", |
| metric_frequency=None): |
| |
| # Setting initial values |
| # ================== |
| self.first_metric_call = True |
| self.metrics = [] |
| self.metrics_numbers = [] |
| self.metric_report_number = 0 |
| self.metric_report_non_zero_count = 4 |
| |
| # Needed so we can initialize Cloudwatch alarms, etc, outside of the init function |
| # but before we start sending data. |
| # This boolean tracks whether we have done the post-initialization prior to sending the first report. |
| self.perform_final_initialization = True |
| |
| # Watched by the thread creating the snapshot. Will cause the thread(s) to abort and return an error. |
| self.abort_due_to_internal_error = False |
| self.abort_due_to_internal_error_reason = "" |
| self.abort_due_to_internal_error_due_to_credentials = False |
| |
| self.git_hash = None |
| self.git_repo_name = None |
| self.git_hash_as_namespace = git_hash_as_namespace |
| self.git_fixed_namespace_text = git_fixed_namespace_text |
| self.git_metric_namespace = None |
| |
| self.cloudwatch_region = cloudwatch_region |
| self.cloudwatch_client = None |
| self.cloudwatch_make_dashboard = cloudwatch_make_dashboard |
| self.cloudwatch_teardown_alarms_on_complete = cloudwatch_teardown_alarms_on_complete |
| self.cloudwatch_teardown_dashboard_on_complete = cloudwatch_teardown_dashboard_on_complete |
| self.cloudwatch_dashboard_name = "" |
| self.cloudwatch_dashboard_widgets = [] |
| |
| self.s3_bucket_name = s3_bucket_name |
| self.s3_client = None |
| self.s3_bucket_upload_on_complete = s3_bucket_upload_on_complete |
| |
| self.output_to_file_filepath = output_log_filepath |
| self.output_to_file = False |
| self.output_file = None |
| self.output_to_console = output_to_console |
| |
| self.lambda_client = None |
| self.lambda_name = lambda_name |
| |
| self.datetime_string = datetime_string |
| self.metric_frequency = metric_frequency |
| # ================== |
| |
| # Check for valid credentials |
| # ================== |
| try: |
| tmp_sts_client = boto3.client('sts') |
| tmp_sts_client.get_caller_identity() |
| except Exception as e: |
| print ("[DataSnapshot] ERROR - AWS credentials are NOT valid!") |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "AWS credentials are NOT valid!" |
| self.abort_due_to_internal_error_due_to_credentials = True |
| return |
| # ================== |
| |
| # Git related stuff |
| # ================== |
| if (git_hash == None or git_repo_name == None): |
| print("[DataSnapshot] ERROR - a Git hash and repository name are REQUIRED for the canary wrapper to run!") |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "No Git hash and repository passed!" |
| return |
| |
| self.git_hash = git_hash |
| self.git_repo_name = git_repo_name |
| |
| if (self.git_hash_as_namespace == False): |
| self.git_metric_namespace = self.git_fixed_namespace_text |
| else: |
| if (self.datetime_string == None): |
| git_namespace_prepend_text = self.git_repo_name + "-" + self.git_hash |
| else: |
| git_namespace_prepend_text = self.git_repo_name + "/" + self.datetime_string + "-" + self.git_hash |
| self.git_metric_namespace = git_namespace_prepend_text |
| # ================== |
| |
| # Cloudwatch related stuff |
| # ================== |
| try: |
| self.cloudwatch_client = boto3.client('cloudwatch', self.cloudwatch_region) |
| self.cloudwatch_dashboard_name = self.git_metric_namespace |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not make Cloudwatch client due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.cloudwatch_client = None |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Could not make Cloudwatch client!" |
| return |
| # ================== |
| |
| # S3 related stuff |
| # ================== |
| try: |
| self.s3_client = boto3.client("s3") |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not make S3 client due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.s3_client = None |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Could not make S3 client!" |
| return |
| # ================== |
| |
| # Lambda related stuff |
| # ================== |
| try: |
| self.lambda_client = boto3.client("lambda", self.cloudwatch_region) |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not make Lambda client due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.lambda_client = None |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Could not make Lambda client!" |
| return |
| # ================== |
| |
| # File output (logs) related stuff |
| # ================== |
| if (not output_log_filepath is None): |
| self.output_to_file = True |
| self.output_file = open(self.output_to_file_filepath, "w") |
| else: |
| self.output_to_file = False |
| self.output_file = None |
| # ================== |
| |
| self.print_message("[DataSnapshot] Data snapshot created!") |
| |
| # Cleans the class - closing any files, removing alarms, and sending data to S3. |
| # Should be called at the end when you are totally finished shadowing metrics |
| def cleanup(self, error_occurred=False): |
| if (self.s3_bucket_upload_on_complete == True): |
| self.export_result_to_s3_bucket(copy_output_log=True, log_is_error=error_occurred) |
| |
| self._cleanup_cloudwatch_alarms() |
| if (self.cloudwatch_make_dashboard == True): |
| self._cleanup_cloudwatch_dashboard() |
| |
| self.print_message("[DataSnapshot] Data snapshot cleaned!") |
| |
| if (self.output_file is not None): |
| self.output_file.close() |
| self.output_file = None |
| |
| # Utility function for printing messages |
| def print_message(self, message): |
| if self.output_to_file == True: |
| self.output_file.write(message + "\n") |
| if self.output_to_console == True: |
| print(message, flush=True) |
| |
| # Utility function - adds the metric alarms to Cloudwatch. We do run this right before the first |
| # collection of metrics so we can register metrics before we initialize Cloudwatch |
| def _init_cloudwatch_pre_first_run(self): |
| for metric in self.metrics: |
| if (not metric.metric_alarm_threshold is None): |
| self._add_cloudwatch_metric_alarm(metric) |
| |
| if (self.cloudwatch_make_dashboard == True): |
| self._init_cloudwatch_pre_first_run_dashboard() |
| |
| # Utility function - adds the Cloudwatch Dashboard for the currently running data snapshot |
| def _init_cloudwatch_pre_first_run_dashboard(self): |
| try: |
| # Remove the old dashboard if it exists before adding a new one |
| self._cleanup_cloudwatch_dashboard() |
| |
| new_dashboard_widgets_array = [] |
| for widget in self.cloudwatch_dashboard_widgets: |
| new_dashboard_widgets_array.append(widget.get_widget_dictionary()) |
| |
| new_dashboard_body = { |
| "start": "-PT1H", |
| "widgets": new_dashboard_widgets_array, |
| } |
| new_dashboard_body_json = json.dumps(new_dashboard_body) |
| |
| self.cloudwatch_client.put_dashboard( |
| DashboardName=self.cloudwatch_dashboard_name, |
| DashboardBody= new_dashboard_body_json) |
| self.print_message("[DataSnapshot] Added Cloudwatch dashboard successfully") |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - Cloudwatch client could not make dashboard due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Cloudwatch client could not make dashboard due to exception" |
| return |
| |
| # Utility function - The function that adds each individual metric alarm. |
| def _add_cloudwatch_metric_alarm(self, metric): |
| if self.cloudwatch_client is None: |
| self.print_message("[DataSnapshot] ERROR - Cloudwatch client not setup. Cannot register alarm") |
| return |
| |
| try: |
| self.cloudwatch_client.put_metric_alarm( |
| AlarmName=metric.metric_alarm_name, |
| AlarmDescription=metric.metric_alarm_description, |
| MetricName=metric.metric_name, |
| Namespace=self.git_metric_namespace, |
| Statistic="Maximum", |
| Dimensions=metric.metric_dimensions, |
| Period=60, # How long (in seconds) is an evaluation period? |
| EvaluationPeriods=120, # How many periods does it need to be invalid for? |
| DatapointsToAlarm=1, # How many data points need to be invalid? |
| Threshold=metric.metric_alarm_threshold, |
| ComparisonOperator="GreaterThanOrEqualToThreshold", |
| ) |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not register alarm for metric due to exception: " + metric.metric_name) |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| |
| # Utility function - removes all the Cloudwatch alarms for the metrics |
| def _cleanup_cloudwatch_alarms(self): |
| if (self.cloudwatch_teardown_alarms_on_complete == True): |
| try: |
| for metric in self.metrics: |
| if (not metric.metric_alarm_threshold is None): |
| self.cloudwatch_client.delete_alarms(AlarmNames=[metric.metric_alarm_name]) |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not delete alarms due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| |
| # Utility function - removes all Cloudwatch dashboards created |
| def _cleanup_cloudwatch_dashboard(self): |
| if (self.cloudwatch_teardown_dashboard_on_complete == True): |
| try: |
| self.cloudwatch_client.delete_dashboards(DashboardNames=[self.cloudwatch_dashboard_name]) |
| self.print_message("[DataSnapshot] Cloudwatch Dashboards deleted successfully!") |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - dashboard cleaning function failed due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Cloudwatch dashboard cleaning function failed due to exception" |
| return |
| |
| # Returns the results of the metric alarms. Will return a list containing tuples with the following structure: |
| # [Boolean (False = the alarm is in the ALARM state), String (Name of the alarm that is in the ALARM state), int (severity of alarm)] |
| # Currently this function will only return a list of failed alarms, so if the returned list is empty, then it means all |
| # alarms did not get to the ALARM state in Cloudwatch for the registered metrics |
| def get_cloudwatch_alarm_results(self): |
| return self._check_cloudwatch_alarm_states() |
| |
| # Utility function - collects the metric alarm results and returns them in a list. |
| def _check_cloudwatch_alarm_states(self): |
| return_result_list = [] |
| |
| tmp = None |
| for metric in self.metrics: |
| tmp = self._check_cloudwatch_alarm_state_metric(metric) |
| if (tmp[1] != None): |
| # Do not cut a ticket for the "Alive_Alarm" that we use to check if the Canary is running |
| if ("Alive_Alarm" in tmp[1] == False): |
| if (tmp[0] != True): |
| return_result_list.append(tmp) |
| |
| return return_result_list |
| |
| # Utility function - checks each individual alarm and returns a tuple with the following format: |
| # [Boolean (False if the alarm is in the ALARM state, otherwise it is true), String (name of the alarm), Int (severity of alarm)] |
| def _check_cloudwatch_alarm_state_metric(self, metric): |
| alarms_response = self.cloudwatch_client.describe_alarms_for_metric( |
| MetricName=metric.metric_name, |
| Namespace=self.git_metric_namespace, |
| Dimensions=metric.metric_dimensions) |
| |
| return_result = [True, None, metric.metric_alarm_severity] |
| |
| for metric_alarm_dict in alarms_response["MetricAlarms"]: |
| if metric_alarm_dict["StateValue"] == "ALARM": |
| return_result[0] = False |
| return_result[1] = metric_alarm_dict["AlarmName"] |
| break |
| |
| return return_result |
| |
| # Exports a file with the same name as the commit Git hash to an S3 bucket in a folder with the Git repo name. |
| # By default, this file will only contain the Git hash. |
| # If copy_output_log is true, then the output log will be copied into this file, which may be useful for debugging. |
| def export_result_to_s3_bucket(self, copy_output_log=False, log_is_error=False): |
| if (self.s3_client is None): |
| self.print_message("[DataSnapshot] ERROR - No S3 client initialized! Cannot send log to S3") |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "S3 client not initialized and therefore cannot send log to S3" |
| return |
| |
| s3_file = open(self.git_hash + ".log", "w") |
| s3_file.write(self.git_hash) |
| |
| # Might be useful for debugging? |
| if (copy_output_log == True and self.output_to_file == True): |
| # Are we still writing? If so, then we need to close the file first so everything is written to it |
| is_output_file_open_previously = False |
| if (self.output_file != None): |
| self.output_file.close() |
| is_output_file_open_previously = True |
| self.output_file = open(self.output_to_file_filepath, "r") |
| |
| s3_file.write("\n\nOUTPUT LOG\n") |
| s3_file.write("==========================================================================================\n") |
| output_file_lines = self.output_file.readlines() |
| for line in output_file_lines: |
| s3_file.write(line) |
| |
| self.output_file.close() |
| |
| # If we were writing to the output previously, then we need to open in RW mode so we can continue to write to it |
| if (is_output_file_open_previously == True): |
| self.output_to_file = open(self.output_to_file_filepath, "a") |
| |
| s3_file.close() |
| |
| # Upload to S3 |
| try: |
| if (log_is_error == False): |
| if (self.datetime_string == None): |
| self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/" + self.git_hash + ".log") |
| else: |
| self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/" + self.datetime_string + "/" + self.git_hash + ".log") |
| else: |
| if (self.datetime_string == None): |
| self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/Failed_Logs/" + self.git_hash + ".log") |
| else: |
| self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/Failed_Logs/" + self.datetime_string + "/" + self.git_hash + ".log") |
| self.print_message("[DataSnapshot] Uploaded to S3!") |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not upload to S3 due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "S3 client had exception and therefore could not upload log!" |
| os.remove(self.git_hash + ".log") |
| return |
| |
| # Delete the file when finished |
| os.remove(self.git_hash + ".log") |
| |
| # Sends an email via a special lambda. The payload has to contain a message and a subject |
| # * (REQUIRED) message is the message you want to send in the body of the email |
| # * (REQUIRED) subject is the subject that the email will be sent with |
| def lambda_send_email(self, message, subject): |
| |
| payload = {"Message":message, "Subject":subject} |
| payload_string = json.dumps(payload) |
| |
| try: |
| self.lambda_client.invoke( |
| FunctionName=self.lambda_name, |
| InvocationType="Event", |
| ClientContext="MQTT Wrapper Script", |
| Payload=payload_string |
| ) |
| except Exception as e: |
| self.print_message("[DataSnapshot] ERROR - could not send email via Lambda due to exception!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Lambda email function had an exception!" |
| return |
| |
| # Registers a metric to be polled by the Snapshot. |
| # * (REQUIRED) new_metric_name is the name of the metric. Cloudwatch will use this name |
| # * (REQUIRED) new_metric_function is expected to be a pointer to a Python function and will not work if you pass a value/object |
| # * (OPTIONAL) new_metric_unit is the metric unit. There is a list of possible metric unit types on the Boto3 documentation for Cloudwatch |
| # * (OPTIONAL) new_metric_alarm_threshold is the value that the metric has to exceed in order to be registered as an alarm |
| # * (OPTIONAL) new_reports_to_skip is the number of reports this metric will return nothing, but will get it's value. |
| # * Useful for CPU calculations that require deltas |
| # * (OPTIONAL) new_metric_alarm_severity is the severity of the ticket if this alarm is triggered. A severity of 6+ means no ticket. |
| # * (OPTIONAL) is_percent whether or not to display the metric as a percent when printing it (default=false) |
| def register_metric(self, new_metric_name, new_metric_function, new_metric_unit="None", |
| new_metric_alarm_threshold=None, new_metric_reports_to_skip=0, new_metric_alarm_severity=6, is_percent=False): |
| |
| new_metric_dimensions = [] |
| |
| if (self.git_hash_as_namespace == False): |
| git_namespace_prepend_text = self.git_repo_name + "-" + self.git_hash |
| new_metric_dimensions.append( |
| {"Name": git_namespace_prepend_text, "Value": new_metric_name}) |
| else: |
| new_metric_dimensions.append( |
| {"Name": "System_Metrics", "Value": new_metric_name}) |
| |
| new_metric = DataSnapshot_Metric( |
| metric_name=new_metric_name, |
| metric_function=new_metric_function, |
| metric_dimensions=new_metric_dimensions, |
| metric_unit=new_metric_unit, |
| metric_alarm_threshold=new_metric_alarm_threshold, |
| metric_alarm_severity=new_metric_alarm_severity, |
| git_hash=self.git_hash, |
| git_repo_name=self.git_repo_name, |
| reports_to_skip=new_metric_reports_to_skip, |
| is_percent=is_percent |
| ) |
| self.metrics.append(new_metric) |
| # append an empty list so we can track it's metrics over time |
| self.metrics_numbers.append([]) |
| |
| def register_dashboard_widget(self, new_widget_name, metrics_to_add=[], new_widget_period=60): |
| |
| # We need to know what metric dimension to get the metric(s) from |
| metric_dimension_string = "" |
| if (self.git_hash_as_namespace == False): |
| metric_dimension_string = self.git_repo_name + "-" + self.git_hash |
| else: |
| metric_dimension_string = "System_Metrics" |
| |
| widget = self._find_cloudwatch_widget(name=new_widget_name) |
| if (widget == None): |
| widget = DataSnapshot_Dashboard_Widget( |
| widget_name=new_widget_name, metric_namespace=self.git_metric_namespace, |
| metric_dimension=metric_dimension_string, |
| cloudwatch_region=self.cloudwatch_region, |
| widget_period=new_widget_period) |
| self.cloudwatch_dashboard_widgets.append(widget) |
| |
| for metric in metrics_to_add: |
| self.register_metric_to_dashboard_widget(widget_name=new_widget_name, metric_name=metric) |
| |
| def register_metric_to_dashboard_widget(self, widget_name, metric_name, widget=None): |
| if widget is None: |
| widget = self._find_cloudwatch_widget(name=widget_name) |
| if widget is None: |
| print ("[DataSnapshot] ERROR - could not find widget with name: " + widget_name, flush=True) |
| return |
| |
| # Adjust metric name so it has the git hash, repo, etc |
| metric_name_formatted = metric_name |
| |
| widget.add_metric_to_widget(new_metric_name=metric_name_formatted) |
| return |
| |
| def remove_metric_from_dashboard_widget(self, widget_name, metric_name, widget=None): |
| if widget is None: |
| widget = self._find_cloudwatch_widget(name=widget_name) |
| if widget is None: |
| print ("[DataSnapshot] ERROR - could not find widget with name: " + widget_name, flush=True) |
| return |
| widget.remove_metric_from_widget(existing_metric_name=metric_name) |
| return |
| |
| def _find_cloudwatch_widget(self, name): |
| result = None |
| for widget in self.cloudwatch_dashboard_widgets: |
| if widget.widget_name == name: |
| return widget |
| return result |
| |
| # Prints the metrics to the console |
| def export_metrics_console(self): |
| datetime_now = datetime.datetime.now() |
| datetime_string = datetime_now.strftime("%d-%m-%Y/%H:%M:%S") |
| |
| self.print_message("\n[DataSnapshot] Metric report: " + str(self.metric_report_number) + " (" + datetime_string + ")") |
| for metric in self.metrics: |
| if (metric.is_percent == True): |
| self.print_message(" " + metric.metric_name + |
| " - value: " + str(metric.metric_value) + "%") |
| else: |
| self.print_message(" " + metric.metric_name + |
| " - value: " + str(metric.metric_value)) |
| self.print_message("") |
| |
| # Sends all registered metrics to Cloudwatch. |
| # Does NOT need to called on loop. Call post_metrics on loop to send all the metrics as expected. |
| # This is just the Cloudwatch part of that loop. |
| def export_metrics_cloudwatch(self): |
| if (self.cloudwatch_client == None): |
| self.print_message("[DataSnapshot] Error - cannot export Cloudwatch metrics! Cloudwatch was not initialized.") |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Could not export Cloudwatch metrics due to no Cloudwatch client initialized!" |
| return |
| |
| self.print_message("[DataSnapshot] Preparing to send to Cloudwatch...") |
| metrics_data = [] |
| metric_data_tmp = None |
| for metric in self.metrics: |
| metric_data_tmp = metric.get_metric_cloudwatch_dictionary() |
| if (not metric_data_tmp is None): |
| metrics_data.append(metric_data_tmp) |
| |
| if (len(metrics_data) == 0): |
| self.print_message("[DataSnapshot] INFO - no metric data to send. Skipping...") |
| return |
| |
| try: |
| self.cloudwatch_client.put_metric_data( |
| Namespace=self.git_metric_namespace, |
| MetricData=metrics_data) |
| self.print_message("[DataSnapshot] Metrics sent to Cloudwatch.") |
| except Exception as e: |
| self.print_message("[DataSnapshot] Error - something when wrong posting cloudwatch metrics!") |
| self.print_message("[DataSnapshot] Exception: " + str(e)) |
| self.abort_due_to_internal_error = True |
| self.abort_due_to_internal_error_reason = "Could not export Cloudwatch metrics due to exception in Cloudwatch client!" |
| return |
| |
| # Call this at a set interval to post the metrics to Cloudwatch, etc. |
| # This is the function you want to call repeatedly after you have everything setup. |
| def post_metrics(self, psutil_process : psutil.Process): |
| if (self.perform_final_initialization == True): |
| self.perform_final_initialization = False |
| self._init_cloudwatch_pre_first_run() |
| |
| # Update the metric values internally |
| for i in range(0, len(self.metrics)): |
| metric_value = self.metrics[i].get_metric_value(psutil_process) |
| self.metrics_numbers[i].insert(0, metric_value) |
| |
| # Only keep the last metric_report_non_zero_count results |
| if (len(self.metrics_numbers[i]) > self.metric_report_non_zero_count): |
| amount_to_delete = len(self.metrics_numbers[i]) - self.metric_report_non_zero_count |
| del self.metrics_numbers[i][-amount_to_delete:] |
| # If we have metric_report_non_zero_count amount of metrics, make sure there is at least one |
| # non-zero. If it is all zero, then print a log so we can easily find it |
| if (len(self.metrics_numbers[i]) == self.metric_report_non_zero_count): |
| non_zero_found = False |
| for j in range(0, len(self.metrics_numbers[i])): |
| if (self.metrics_numbers[i][j] != 0.0 and self.metrics_numbers[i][j] != None): |
| non_zero_found = True |
| break |
| if (non_zero_found == False): |
| self.print_message("\n[DataSnapshot] METRIC ZERO ERROR!") |
| self.print_message(f"[DataSnapshot] Metric index {i} has been zero for last {self.metric_report_non_zero_count} reports!") |
| self.print_message("\n") |
| |
| self.metric_report_number += 1 |
| |
| self.export_metrics_console() |
| self.export_metrics_cloudwatch() |
| |
| def output_diagnosis_information(self, dependencies_list): |
| |
| # Print general diagnosis information |
| self.print_message("\n========== Canary Wrapper diagnosis information ==========") |
| self.print_message("\nRunning Canary for repository: " + self.git_repo_name) |
| self.print_message("\t Commit hash: " + self.git_hash) |
| |
| if not dependencies_list == "": |
| self.print_message("\nDependencies:") |
| dependencies_list = dependencies_list.split(";") |
| dependencies_list_found_hash = False |
| for i in range(0, len(dependencies_list)): |
| # There's probably a better way to do this... |
| if (dependencies_list_found_hash == True): |
| dependencies_list_found_hash = False |
| continue |
| self.print_message("* " + dependencies_list[i]) |
| if (i+1 < len(dependencies_list)): |
| self.print_message("\t Commit hash: " + dependencies_list[i+1]) |
| dependencies_list_found_hash = True |
| else: |
| self.print_message("\t Commit hash: Unknown") |
| |
| if (self.metric_frequency != None): |
| self.print_message("\nMetric Snapshot Frequency: " + str(self.metric_frequency) + " seconds") |
| self.print_message("\nMetrics:") |
| for metric in self.metrics: |
| self.print_message("* " + metric.metric_name) |
| if metric.metric_alarm_threshold is not None: |
| self.print_message("\t Alarm Threshold: " + str(metric.metric_alarm_threshold)) |
| self.print_message("\t Alarm Severity: " + str(metric.metric_alarm_severity)) |
| else: |
| self.print_message("\t No alarm set for metric.") |
| |
| self.print_message("\n") |
| self.print_message("==========================================================") |
| self.print_message("\n") |
| |
| # ================================================================================ |
| |
| class SnapshotMonitor(): |
| def __init__(self, wrapper_data_snapshot, wrapper_metrics_wait_time) -> None: |
| |
| self.data_snapshot = wrapper_data_snapshot |
| self.had_internal_error = False |
| self.error_due_to_credentials = False |
| self.internal_error_reason = "" |
| self.error_due_to_alarm = False |
| |
| self.can_cut_ticket = False |
| self.has_cut_ticket = False |
| |
| # A list of all the alarms triggered in the last check, cached for later |
| # NOTE - this is only the alarm names! Not the severity. This just makes it easier to process |
| self.cloudwatch_current_alarms_triggered = [] |
| |
| # Check for errors |
| if (self.data_snapshot.abort_due_to_internal_error == True): |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not initialize DataSnapshot. Likely credentials are not setup!" |
| if (self.data_snapshot.abort_due_to_internal_error_due_to_credentials == True): |
| self.error_due_to_credentials = True |
| self.data_snapshot.cleanup() |
| return |
| |
| # How long to wait before posting a metric |
| self.metric_post_timer = 0 |
| self.metric_post_timer_time = wrapper_metrics_wait_time |
| |
| |
| def register_metric(self, new_metric_name, new_metric_function, new_metric_unit="None", new_metric_alarm_threshold=None, |
| new_metric_reports_to_skip=0, new_metric_alarm_severity=6): |
| |
| try: |
| self.data_snapshot.register_metric( |
| new_metric_name=new_metric_name, |
| new_metric_function=new_metric_function, |
| new_metric_unit=new_metric_unit, |
| new_metric_alarm_threshold=new_metric_alarm_threshold, |
| new_metric_reports_to_skip=new_metric_reports_to_skip, |
| new_metric_alarm_severity=new_metric_alarm_severity) |
| except Exception as e: |
| self.print_message("[SnaptshotMonitor] ERROR - could not register metric in data snapshot due to exception!") |
| self.print_message("[SnaptshotMonitor] Exception: " + str(e)) |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not register metric in data snapshot due to exception" |
| return |
| |
| def register_dashboard_widget(self, new_widget_name, metrics_to_add=[], widget_period=60): |
| self.data_snapshot.register_dashboard_widget(new_widget_name=new_widget_name, metrics_to_add=metrics_to_add, new_widget_period=widget_period) |
| |
| def output_diagnosis_information(self, dependencies=""): |
| self.data_snapshot.output_diagnosis_information(dependencies_list=dependencies) |
| |
| def check_alarms_for_new_alarms(self, triggered_alarms): |
| |
| if len(triggered_alarms) > 0: |
| self.data_snapshot.print_message( |
| "WARNING - One or more alarms are in state of ALARM") |
| |
| old_alarms_still_active = [] |
| new_alarms = [] |
| new_alarms_highest_severity = 6 |
| new_alarm_found = True |
| new_alarm_ticket_description = "Canary has metrics in ALARM state!\n\nMetrics in alarm:\n" |
| |
| for triggered_alarm in triggered_alarms: |
| new_alarm_found = True |
| |
| # Is this a new alarm? |
| for old_alarm_name in self.cloudwatch_current_alarms_triggered: |
| if (old_alarm_name == triggered_alarm[1]): |
| new_alarm_found = False |
| old_alarms_still_active.append(triggered_alarm[1]) |
| |
| new_alarm_ticket_description += "* (STILL IN ALARM) " + triggered_alarm[1] + "\n" |
| new_alarm_ticket_description += "\tSeverity: " + str(triggered_alarm[2]) |
| new_alarm_ticket_description += "\n" |
| break |
| |
| # If it is a new alarm, then add it to our list so we can cut a new ticket |
| if (new_alarm_found == True): |
| self.data_snapshot.print_message(' (NEW) Alarm with name "' + triggered_alarm[1] + '" is in the ALARM state!') |
| new_alarms.append(triggered_alarm[1]) |
| if (triggered_alarm[2] < new_alarms_highest_severity): |
| new_alarms_highest_severity = triggered_alarm[2] |
| new_alarm_ticket_description += "* " + triggered_alarm[1] + "\n" |
| new_alarm_ticket_description += "\tSeverity: " + str(triggered_alarm[2]) |
| new_alarm_ticket_description += "\n" |
| |
| |
| if len(new_alarms) > 0: |
| if (self.can_cut_ticket == True): |
| cut_ticket_using_cloudwatch( |
| git_repo_name=self.data_snapshot.git_repo_name, |
| git_hash=self.data_snapshot.git_hash, |
| git_hash_as_namespace=False, |
| git_fixed_namespace_text=self.data_snapshot.git_fixed_namespace_text, |
| cloudwatch_region="us-east-1", |
| ticket_description="New metric(s) went into alarm for the Canary! Metrics in alarm: " + str(new_alarms), |
| ticket_reason="New metric(s) went into alarm", |
| ticket_allow_duplicates=True, |
| ticket_category="AWS", |
| ticket_item="IoT SDK for CPP", |
| ticket_group="AWS IoT Device SDK", |
| ticket_type="SDKs and Tools", |
| ticket_severity=4) |
| self.has_cut_ticket = True |
| |
| # Cache the new alarms and the old alarms |
| self.cloudwatch_current_alarms_triggered = old_alarms_still_active + new_alarms |
| |
| else: |
| self.cloudwatch_current_alarms_triggered.clear() |
| |
| |
| def monitor_loop_function(self, psutil_process : psutil.Process, time_passed=30): |
| # Check for internal errors |
| if (self.data_snapshot.abort_due_to_internal_error == True): |
| self.had_internal_error = True |
| self.internal_error_reason = "Data Snapshot internal error: " + self.data_snapshot.abort_due_to_internal_error_reason |
| return |
| |
| try: |
| # Poll the metric alarms |
| if (self.had_internal_error == False): |
| # Get a report of all the alarms that might have been set to an alarm state |
| triggered_alarms = self.data_snapshot.get_cloudwatch_alarm_results() |
| self.check_alarms_for_new_alarms(triggered_alarms) |
| except Exception as e: |
| self.print_message("[SnaptshotMonitor] ERROR - exception occurred checking metric alarms!") |
| self.print_message("[SnaptshotMonitor] (Likely session credentials expired)") |
| self.had_internal_error = True |
| self.internal_error_reason = "Exception occurred checking metric alarms! Likely session credentials expired" |
| return |
| |
| if (self.metric_post_timer <= 0): |
| if (self.had_internal_error == False): |
| try: |
| self.data_snapshot.post_metrics(psutil_process) |
| except Exception as e: |
| self.print_message("[SnaptshotMonitor] ERROR - exception occurred posting metrics!") |
| self.print_message("[SnaptshotMonitor] (Likely session credentials expired)") |
| |
| print (e, flush=True) |
| |
| self.had_internal_error = True |
| self.internal_error_reason = "Exception occurred posting metrics! Likely session credentials expired" |
| return |
| |
| # reset the timer |
| self.metric_post_timer += self.metric_post_timer_time |
| |
| # Gather and post the metrics |
| self.metric_post_timer -= time_passed |
| |
| |
| def send_email(self, email_body, email_subject_text_append=None): |
| if (email_subject_text_append != None): |
| self.data_snapshot.lambda_send_email(email_body, "Canary: " + self.data_snapshot.git_repo_name + ":" + self.data_snapshot.git_hash + " - " + email_subject_text_append) |
| else: |
| self.data_snapshot.lambda_send_email(email_body, "Canary: " + self.data_snapshot.git_repo_name + ":" + self.data_snapshot.git_hash) |
| |
| |
| def stop_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def start_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def restart_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def cleanup_monitor(self, error_occurred=False): |
| self.data_snapshot.cleanup(error_occurred=error_occurred) |
| |
| def print_message(self, message): |
| if (self.data_snapshot != None): |
| self.data_snapshot.print_message(message) |
| else: |
| print(message, flush=True) |
| |
| # ================================================================================ |
| |
| class ApplicationMonitor(): |
| def __init__(self, wrapper_application_path, wrapper_application_arguments, wrapper_application_restart_on_finish=True, data_snapshot=None) -> None: |
| self.application_process = None |
| self.application_process_psutil = None |
| self.error_has_occurred = False |
| self.error_due_to_credentials = False |
| self.error_reason = "" |
| self.error_code = 0 |
| self.wrapper_application_path = wrapper_application_path |
| self.wrapper_application_arguments = wrapper_application_arguments |
| self.wrapper_application_restart_on_finish = wrapper_application_restart_on_finish |
| self.data_snapshot=data_snapshot |
| |
| self.stdout_file_path = "Canary_Stdout_File.txt" |
| |
| def start_monitoring(self): |
| self.print_message("[ApplicationMonitor] Starting to monitor application...") |
| |
| if (self.application_process == None): |
| try: |
| canary_command = self.wrapper_application_path + " " + self.wrapper_application_arguments |
| self.application_process = subprocess.Popen(canary_command + " | tee " + self.stdout_file_path, shell=True) |
| self.application_process_psutil = psutil.Process(self.application_process.pid) |
| self.print_message ("[ApplicationMonitor] Application started...") |
| except Exception as e: |
| self.print_message ("[ApplicationMonitor] ERROR - Could not launch Canary/Application due to exception!") |
| self.print_message ("[ApplicationMonitor] Exception: " + str(e)) |
| self.error_has_occurred = True |
| self.error_reason = "Could not launch Canary/Application due to exception" |
| self.error_code = 1 |
| return |
| else: |
| self.print_message("[ApplicationMonitor] ERROR - Monitor already has an application process! Cannot monitor two applications with one monitor class!") |
| |
| def restart_monitoring(self): |
| self.print_message ("[ApplicationMonitor] Restarting monitor application...") |
| |
| if (self.application_process != None): |
| try: |
| self.stop_monitoring() |
| self.start_monitoring() |
| self.print_message("\n[ApplicationMonitor] Restarted monitor application!") |
| self.print_message("================================================================================") |
| except Exception as e: |
| self.print_message("[ApplicationMonitor] ERROR - Could not restart Canary/Application due to exception!") |
| self.print_message("[ApplicationMonitor] Exception: " + str(e)) |
| self.error_has_occurred = True |
| self.error_reason = "Could not restart Canary/Application due to exception" |
| self.error_code = 1 |
| return |
| else: |
| self.print_message("[ApplicationMonitor] ERROR - Application process restart called but process is/was not running!") |
| self.error_has_occurred = True |
| self.error_reason = "Could not restart Canary/Application due to application process not being started initially" |
| self.error_code = 1 |
| return |
| |
| |
| def stop_monitoring(self): |
| self.print_message ("[ApplicationMonitor] Stopping monitor application...") |
| if (not self.application_process == None): |
| self.application_process.terminate() |
| self.application_process.wait() |
| self.print_message ("[ApplicationMonitor] Stopped monitor application!") |
| self.application_process = None |
| self.print_stdout() |
| else: |
| self.print_message ("[ApplicationMonitor] ERROR - cannot stop monitor application because no process is found!") |
| |
| def print_stdout(self): |
| # Print the STDOUT file |
| if (os.path.isfile(self.stdout_file_path)): |
| self.print_message("Just finished Application STDOUT: ") |
| with open(self.stdout_file_path, "r") as stdout_file: |
| self.print_message(stdout_file.read()) |
| os.remove(self.stdout_file_path) |
| |
| def monitor_loop_function(self, time_passed=30): |
| if (self.application_process != None): |
| |
| application_process_return_code = None |
| try: |
| application_process_return_code = self.application_process.poll() |
| except Exception as e: |
| self.print_message("[ApplicationMonitor] ERROR - exception occurred while trying to poll application status!") |
| self.print_message("[ApplicationMonitor] Exception: " + str(e)) |
| self.error_has_occurred = True |
| self.error_reason = "Exception when polling application status" |
| self.error_code = 1 |
| return |
| |
| # If it is not none, then the application finished |
| if (application_process_return_code != None): |
| |
| self.print_message("[ApplicationMonitor] Monitor application has stopped! Processing result...") |
| |
| if (application_process_return_code != 0): |
| self.print_message("[ApplicationMonitor] ERROR - Something Crashed in Canary/Application!") |
| self.print_message("[ApplicationMonitor] Error code: " + str(application_process_return_code)) |
| |
| self.error_has_occurred = True |
| self.error_reason = "Canary application crashed!" |
| self.error_code = application_process_return_code |
| else: |
| # Should we restart? |
| if (self.wrapper_application_restart_on_finish == True): |
| self.print_message("[ApplicationMonitor] NOTE - Canary finished running and is restarting...") |
| self.restart_monitoring() |
| else: |
| self.print_message("[ApplicationMonitor] Monitor application has stopped and monitor is not supposed to restart... Finishing...") |
| self.error_has_occurred = True |
| self.error_reason = "Canary Application Finished" |
| self.error_code = 0 |
| else: |
| self.print_message("[ApplicationMonitor] Monitor application is still running...") |
| |
| def cleanup_monitor(self, error_occurred=False): |
| pass |
| |
| def print_message(self, message): |
| if (self.data_snapshot != None): |
| self.data_snapshot.print_message(message) |
| else: |
| print(message, flush=True) |
| |
| # ================================================================================ |
| |
| class S3Monitor(): |
| |
| def __init__(self, s3_bucket_name, s3_file_name, s3_file_name_in_zip, canary_local_application_path, data_snapshot) -> None: |
| self.s3_client = None |
| self.s3_current_object_version_id = None |
| self.s3_current_object_last_modified = None |
| self.s3_bucket_name = s3_bucket_name |
| self.s3_file_name = s3_file_name |
| self.s3_file_name_only_path, self.s3_file_name_only_extension = os.path.splitext(s3_file_name) |
| self.data_snapshot = data_snapshot |
| |
| self.canary_local_application_path = canary_local_application_path |
| |
| self.s3_file_name_in_zip = s3_file_name_in_zip |
| self.s3_file_name_in_zip_only_path = None |
| self.s3_file_name_in_zip_only_extension = None |
| if (self.s3_file_name_in_zip != None): |
| self.s3_file_name_in_zip_only_path, self.s3_file_name_in_zip_only_extension = os.path.splitext(s3_file_name_in_zip) |
| |
| self.s3_file_needs_replacing = False |
| |
| self.had_internal_error = False |
| self.error_due_to_credentials = False |
| self.internal_error_reason = "" |
| |
| # Check for valid credentials |
| # ================== |
| try: |
| tmp_sts_client = boto3.client('sts') |
| tmp_sts_client.get_caller_identity() |
| except Exception as e: |
| self.print_message("[S3Monitor] ERROR - (S3 Check) AWS credentials are NOT valid!") |
| self.had_internal_error = True |
| self.error_due_to_credentials = True |
| self.internal_error_reason = "AWS credentials are NOT valid!" |
| return |
| # ================== |
| |
| try: |
| self.s3_client = boto3.client("s3") |
| except Exception as e: |
| self.print_message("[S3Monitor] ERROR - (S3 Check) Could not make S3 client") |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not make S3 client for S3 Monitor" |
| return |
| |
| |
| def check_for_file_change(self): |
| try: |
| version_check_response = self.s3_client.list_object_versions( |
| Bucket=self.s3_bucket_name, |
| Prefix=self.s3_file_name_only_path) |
| if "Versions" in version_check_response: |
| for version in version_check_response["Versions"]: |
| if (version["IsLatest"] == True): |
| if (version["VersionId"] != self.s3_current_object_version_id or |
| version["LastModified"] != self.s3_current_object_last_modified): |
| |
| self.print_message("[S3Monitor] Found new version of Canary/Application in S3!") |
| self.print_message("[S3Monitor] Changing running Canary/Application to new one...") |
| |
| # Will be checked by thread to trigger replacing the file |
| self.s3_file_needs_replacing = True |
| |
| self.s3_current_object_version_id = version["VersionId"] |
| self.s3_current_object_last_modified = version["LastModified"] |
| return |
| |
| except Exception as e: |
| self.print_message("[S3Monitor] ERROR - Could not check for new version of file in S3 due to exception!") |
| self.print_message("[S3Monitor] Exception: " + str(e)) |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not check for S3 file due to exception in S3 client" |
| |
| |
| def replace_current_file_for_new_file(self): |
| try: |
| self.print_message("[S3Monitor] Making directory...") |
| if not os.path.exists("tmp"): |
| os.makedirs("tmp") |
| except Exception as e: |
| self.print_message ("[S3Monitor] ERROR - could not make tmp directory to place S3 file into!") |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not make TMP folder for S3 file download" |
| return |
| |
| # Download the file |
| new_file_path = "tmp/new_file" + self.s3_file_name_only_extension |
| try: |
| self.print_message("[S3Monitor] Downloading file...") |
| s3_resource = boto3.resource("s3") |
| s3_resource.meta.client.download_file(self.s3_bucket_name, self.s3_file_name, new_file_path) |
| except Exception as e: |
| self.print_message("[S3Monitor] ERROR - could not download latest S3 file into TMP folder!") |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not download latest S3 file into TMP folder" |
| return |
| |
| # Is it a zip file? |
| if (self.s3_file_name_in_zip != None): |
| self.print_message("[S3Monitor] New file is zip file. Unzipping...") |
| # Unzip it! |
| with zipfile.ZipFile(new_file_path, 'r') as zip_file: |
| zip_file.extractall("tmp/new_file_zip") |
| new_file_path = "tmp/new_file_zip/" + self.s3_file_name_in_zip_only_path + self.s3_file_name_in_zip_only_extension |
| |
| try: |
| # is there a file already present there? |
| if os.path.exists(self.canary_local_application_path) == True: |
| os.remove(self.canary_local_application_path) |
| |
| self.print_message("[S3Monitor] Moving file...") |
| os.replace(new_file_path, self.canary_local_application_path) |
| self.print_message("[S3Monitor] Getting execution rights...") |
| os.system("chmod u+x " + self.canary_local_application_path) |
| |
| except Exception as e: |
| self.print_message("[S3Monitor] ERROR - could not move file into local application path due to exception!") |
| self.print_message("[S3Monitor] Exception: " + str(e)) |
| self.had_internal_error = True |
| self.internal_error_reason = "Could not move file into local application path" |
| return |
| |
| self.print_message("[S3Monitor] New file downloaded and moved into correct location!") |
| self.s3_file_needs_replacing = False |
| |
| |
| def stop_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def start_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def restart_monitoring(self): |
| # Stub - just added for consistency |
| pass |
| |
| |
| def cleanup_monitor(self): |
| # Stub - just added for consistency |
| pass |
| |
| def monitor_loop_function(self, time_passed=30): |
| self.check_for_file_change() |
| |
| def print_message(self, message): |
| if (self.data_snapshot != None): |
| self.data_snapshot.print_message(message) |
| else: |
| print(message, flush=True) |
| |
| # ================================================================================ |
| |
| |
| # Cuts a ticket to SIM using a temporary Cloudwatch metric that is quickly created, triggered, and destroyed. |
| # Can be called in any thread - creates its own Cloudwatch client and any data it needs is passed in. |
| # |
| # See (https://w.amazon.com/bin/view/CloudWatchAlarms/Internal/CloudWatchAlarmsSIMTicketing) for more details |
| # on how the alarm is sent using Cloudwatch. |
| def cut_ticket_using_cloudwatch( |
| ticket_description="Description here!", |
| ticket_reason="Reason here!", |
| ticket_severity=5, |
| ticket_category="AWS", |
| ticket_type="SDKs and Tools", |
| ticket_item="IoT SDK for CPP", |
| ticket_group="AWS IoT Device SDK", |
| ticket_allow_duplicates=False, |
| git_repo_name="REPO NAME", |
| git_hash="HASH", |
| git_hash_as_namespace=False, |
| git_fixed_namespace_text="mqtt5_canary", |
| cloudwatch_region="us-east-1"): |
| |
| git_metric_namespace = "" |
| if (git_hash_as_namespace == False): |
| git_metric_namespace = git_fixed_namespace_text |
| else: |
| git_namespace_prepend_text = git_repo_name + "-" + git_hash |
| git_metric_namespace = git_namespace_prepend_text |
| |
| try: |
| cloudwatch_client = boto3.client('cloudwatch', cloudwatch_region) |
| ticket_alarm_name = git_repo_name + "-" + git_hash + "-AUTO-TICKET" |
| except Exception as e: |
| print ("ERROR - could not create Cloudwatch client to make ticket metric alarm due to exception!") |
| print ("Exception: " + str(e), flush=True) |
| return |
| |
| new_metric_dimensions = [] |
| if (git_hash_as_namespace == False): |
| git_namespace_prepend_text = git_repo_name + "-" + git_hash |
| new_metric_dimensions.append( |
| {"Name": git_namespace_prepend_text, "Value": ticket_alarm_name}) |
| else: |
| new_metric_dimensions.append( |
| {"Name": "System_Metrics", "Value": ticket_alarm_name}) |
| |
| ticket_arn = f"arn:aws:cloudwatch::cwa-internal:ticket:{ticket_severity}:{ticket_category}:{ticket_type}:{ticket_item}:{ticket_group}:" |
| if (ticket_allow_duplicates == True): |
| # use "DO-NOT-DEDUPE" so we can run the same commit again and it will cut another ticket. |
| ticket_arn += "DO-NOT-DEDUPE" |
| # In the ticket ARN, all spaces need to be replaced with + |
| ticket_arn = ticket_arn.replace(" ", "+") |
| |
| ticket_alarm_description = f"AUTO CUT CANARY WRAPPER TICKET\n\nREASON: {ticket_reason}\n\nDESCRIPTION: {ticket_description}\n\n" |
| |
| # Register a metric alarm so it can auto-cut a ticket for us |
| try: |
| cloudwatch_client.put_metric_alarm( |
| AlarmName=ticket_alarm_name, |
| AlarmDescription=ticket_alarm_description, |
| MetricName=ticket_alarm_name, |
| Namespace=git_metric_namespace, |
| Statistic="Maximum", |
| Dimensions=new_metric_dimensions, |
| Period=60, # How long (in seconds) is an evaluation period? |
| EvaluationPeriods=1, # How many periods does it need to be invalid for? |
| DatapointsToAlarm=1, # How many data points need to be invalid? |
| Threshold=1, |
| ComparisonOperator="GreaterThanOrEqualToThreshold", |
| # The data above does not really matter - it just needs to be valid input data. |
| # This is the part that tells Cloudwatch to cut the ticket |
| AlarmActions=[ticket_arn] |
| ) |
| except Exception as e: |
| print ("ERROR - could not create ticket metric alarm due to exception!") |
| print ("Exception: " + str(e), flush=True) |
| return |
| |
| # Trigger the alarm so it cuts the ticket |
| try: |
| cloudwatch_client.set_alarm_state( |
| AlarmName=ticket_alarm_name, |
| StateValue="ALARM", |
| StateReason="AUTO TICKET CUT") |
| except Exception as e: |
| print ("ERROR - could not cut ticket due to exception!") |
| print ("Exception: " + str(e), flush=True) |
| return |
| |
| print("Waiting for ticket metric to trigger...", flush=True) |
| # Wait a little bit (2 seconds)... |
| time.sleep(2) |
| |
| # Remove the metric |
| print("Removing ticket metric...", flush=True) |
| cloudwatch_client.delete_alarms(AlarmNames=[ticket_alarm_name]) |
| |
| print ("Finished cutting ticket via Cloudwatch!", flush=True) |
| return |
| |
| # A helper function that gets the majority of the ticket information from the arguments result from argparser. |
| def cut_ticket_using_cloudwatch_from_args( |
| ticket_description="", |
| ticket_reason="", |
| ticket_severity=6, |
| arguments=None): |
| |
| # Do not cut a ticket for a severity of 6+ |
| if (ticket_severity >= 6): |
| return |
| |
| cut_ticket_using_cloudwatch( |
| ticket_description=ticket_description, |
| ticket_reason=ticket_reason, |
| ticket_severity=ticket_severity, |
| ticket_category=arguments.ticket_category, |
| ticket_type=arguments.ticket_type, |
| ticket_item=arguments.ticket_item, |
| ticket_group=arguments.ticket_group, |
| ticket_allow_duplicates=False, |
| git_repo_name=arguments.git_repo_name, |
| git_hash=arguments.git_hash, |
| git_hash_as_namespace=arguments.git_hash_as_namespace) |