blob: 8abc0a52d831c7f88beac3ea51c939dea07dcf99 [file] [log] [blame] [edit]
# Contains all of the classes that are shared across both the Canary Wrapper and the Persistent Canary Wrapper scripts
# If a class can/is reused, then it should be in this file.
# Needs to be installed prior to running
import boto3
import psutil
# Part of standard packages in Python 3.4+
import time
import os
import json
import subprocess
import zipfile
import datetime
# ================================================================================
# Class that holds metric data and has a few utility functions for getting that data in a format we can use for Cloudwatch
class DataSnapshot_Metric():
def __init__(self, metric_name, metric_function, metric_dimensions=[],
metric_unit="None", metric_alarm_threshold=None, metric_alarm_severity=6,
git_hash="", git_repo_name="", reports_to_skip=0, is_percent=False):
self.metric_name = metric_name
self.metric_function = metric_function
self.metric_dimensions = metric_dimensions
self.metric_unit = metric_unit
self.metric_alarm_threshold = metric_alarm_threshold
self.metric_alarm_name = self.metric_name + "-" + git_repo_name + "-" + git_hash
self.metric_alarm_description = 'Alarm for metric "' + self.metric_name + '" - git hash: ' + git_hash
self.metric_value = None
self.reports_to_skip = reports_to_skip
self.metric_alarm_severity = metric_alarm_severity
self.is_percent = is_percent
# Gets the latest metric value from the metric_function callback
def get_metric_value(self, psutil_process : psutil.Process):
if not self.metric_function is None:
self.metric_value = self.metric_function(psutil_process)
return self.metric_value
# Returns the data needed to send to Cloudwatch when posting metrics
def get_metric_cloudwatch_dictionary(self):
if (self.reports_to_skip > 0):
self.reports_to_skip -= 1
return None # skips sending to Cloudwatch
if (self.metric_value == None):
return None # skips sending to Cloudwatch
return {
"MetricName": self.metric_name,
"Dimensions": self.metric_dimensions,
"Value": self.metric_value,
"Unit": self.metric_unit
}
class DataSnapshot_Dashboard_Widget():
def __init__(self, widget_name, metric_namespace, metric_dimension, cloudwatch_region="us-east-1", widget_period=60) -> None:
self.metric_list = []
self.region = cloudwatch_region
self.widget_name = widget_name
self.metric_namespace = metric_namespace
self.metric_dimension = metric_dimension
self.widget_period = widget_period
def add_metric_to_widget(self, new_metric_name):
try:
self.metric_list.append(new_metric_name)
except Exception as e:
print ("[DataSnapshot_Dashboard] ERROR - could not add metric to dashboard widget due to exception!")
print ("[DataSnapshot_Dashboard] Exception: " + str(e))
def remove_metric_from_widget(self, existing_metric_name):
try:
self.metric_list.remove(existing_metric_name)
except Exception as e:
print ("[DataSnapshot_Dashboard] ERROR - could not remove metric from dashboard widget due to exception!")
print ("[DataSnapshot_Dashboard] Exception: " + str(e))
def get_widget_dictionary(self):
metric_list_json = []
for metric_name in self.metric_list:
metric_list_json.append([self.metric_namespace, metric_name, self.metric_dimension, metric_name])
return {
"type":"metric",
"properties" : {
"metrics" : metric_list_json,
"region": self.region,
"title": self.widget_name,
"period": self.widget_period,
},
"width": 14,
"height": 10
}
# ================================================================================
# Class that keeps track of the metrics registered, sets up Cloudwatch and S3, and sends periodic reports
# Is the backbone of the reporting operation
class DataSnapshot():
def __init__(self,
git_hash=None,
git_repo_name=None,
git_hash_as_namespace=False,
git_fixed_namespace_text="mqtt5_canary",
datetime_string=None,
output_log_filepath=None,
output_to_console=True,
cloudwatch_region="us-east-1",
cloudwatch_make_dashboard=False,
cloudwatch_teardown_alarms_on_complete=True,
cloudwatch_teardown_dashboard_on_complete=True,
s3_bucket_name="canary-wrapper-bucket",
s3_bucket_upload_on_complete=True,
lambda_name="CanarySendEmailLambda",
metric_frequency=None):
# Setting initial values
# ==================
self.first_metric_call = True
self.metrics = []
self.metrics_numbers = []
self.metric_report_number = 0
self.metric_report_non_zero_count = 4
# Needed so we can initialize Cloudwatch alarms, etc, outside of the init function
# but before we start sending data.
# This boolean tracks whether we have done the post-initialization prior to sending the first report.
self.perform_final_initialization = True
# Watched by the thread creating the snapshot. Will cause the thread(s) to abort and return an error.
self.abort_due_to_internal_error = False
self.abort_due_to_internal_error_reason = ""
self.abort_due_to_internal_error_due_to_credentials = False
self.git_hash = None
self.git_repo_name = None
self.git_hash_as_namespace = git_hash_as_namespace
self.git_fixed_namespace_text = git_fixed_namespace_text
self.git_metric_namespace = None
self.cloudwatch_region = cloudwatch_region
self.cloudwatch_client = None
self.cloudwatch_make_dashboard = cloudwatch_make_dashboard
self.cloudwatch_teardown_alarms_on_complete = cloudwatch_teardown_alarms_on_complete
self.cloudwatch_teardown_dashboard_on_complete = cloudwatch_teardown_dashboard_on_complete
self.cloudwatch_dashboard_name = ""
self.cloudwatch_dashboard_widgets = []
self.s3_bucket_name = s3_bucket_name
self.s3_client = None
self.s3_bucket_upload_on_complete = s3_bucket_upload_on_complete
self.output_to_file_filepath = output_log_filepath
self.output_to_file = False
self.output_file = None
self.output_to_console = output_to_console
self.lambda_client = None
self.lambda_name = lambda_name
self.datetime_string = datetime_string
self.metric_frequency = metric_frequency
# ==================
# Check for valid credentials
# ==================
try:
tmp_sts_client = boto3.client('sts')
tmp_sts_client.get_caller_identity()
except Exception as e:
print ("[DataSnapshot] ERROR - AWS credentials are NOT valid!")
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "AWS credentials are NOT valid!"
self.abort_due_to_internal_error_due_to_credentials = True
return
# ==================
# Git related stuff
# ==================
if (git_hash == None or git_repo_name == None):
print("[DataSnapshot] ERROR - a Git hash and repository name are REQUIRED for the canary wrapper to run!")
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "No Git hash and repository passed!"
return
self.git_hash = git_hash
self.git_repo_name = git_repo_name
if (self.git_hash_as_namespace == False):
self.git_metric_namespace = self.git_fixed_namespace_text
else:
if (self.datetime_string == None):
git_namespace_prepend_text = self.git_repo_name + "-" + self.git_hash
else:
git_namespace_prepend_text = self.git_repo_name + "/" + self.datetime_string + "-" + self.git_hash
self.git_metric_namespace = git_namespace_prepend_text
# ==================
# Cloudwatch related stuff
# ==================
try:
self.cloudwatch_client = boto3.client('cloudwatch', self.cloudwatch_region)
self.cloudwatch_dashboard_name = self.git_metric_namespace
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not make Cloudwatch client due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.cloudwatch_client = None
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Could not make Cloudwatch client!"
return
# ==================
# S3 related stuff
# ==================
try:
self.s3_client = boto3.client("s3")
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not make S3 client due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.s3_client = None
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Could not make S3 client!"
return
# ==================
# Lambda related stuff
# ==================
try:
self.lambda_client = boto3.client("lambda", self.cloudwatch_region)
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not make Lambda client due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.lambda_client = None
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Could not make Lambda client!"
return
# ==================
# File output (logs) related stuff
# ==================
if (not output_log_filepath is None):
self.output_to_file = True
self.output_file = open(self.output_to_file_filepath, "w")
else:
self.output_to_file = False
self.output_file = None
# ==================
self.print_message("[DataSnapshot] Data snapshot created!")
# Cleans the class - closing any files, removing alarms, and sending data to S3.
# Should be called at the end when you are totally finished shadowing metrics
def cleanup(self, error_occurred=False):
if (self.s3_bucket_upload_on_complete == True):
self.export_result_to_s3_bucket(copy_output_log=True, log_is_error=error_occurred)
self._cleanup_cloudwatch_alarms()
if (self.cloudwatch_make_dashboard == True):
self._cleanup_cloudwatch_dashboard()
self.print_message("[DataSnapshot] Data snapshot cleaned!")
if (self.output_file is not None):
self.output_file.close()
self.output_file = None
# Utility function for printing messages
def print_message(self, message):
if self.output_to_file == True:
self.output_file.write(message + "\n")
if self.output_to_console == True:
print(message, flush=True)
# Utility function - adds the metric alarms to Cloudwatch. We do run this right before the first
# collection of metrics so we can register metrics before we initialize Cloudwatch
def _init_cloudwatch_pre_first_run(self):
for metric in self.metrics:
if (not metric.metric_alarm_threshold is None):
self._add_cloudwatch_metric_alarm(metric)
if (self.cloudwatch_make_dashboard == True):
self._init_cloudwatch_pre_first_run_dashboard()
# Utility function - adds the Cloudwatch Dashboard for the currently running data snapshot
def _init_cloudwatch_pre_first_run_dashboard(self):
try:
# Remove the old dashboard if it exists before adding a new one
self._cleanup_cloudwatch_dashboard()
new_dashboard_widgets_array = []
for widget in self.cloudwatch_dashboard_widgets:
new_dashboard_widgets_array.append(widget.get_widget_dictionary())
new_dashboard_body = {
"start": "-PT1H",
"widgets": new_dashboard_widgets_array,
}
new_dashboard_body_json = json.dumps(new_dashboard_body)
self.cloudwatch_client.put_dashboard(
DashboardName=self.cloudwatch_dashboard_name,
DashboardBody= new_dashboard_body_json)
self.print_message("[DataSnapshot] Added Cloudwatch dashboard successfully")
except Exception as e:
self.print_message("[DataSnapshot] ERROR - Cloudwatch client could not make dashboard due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Cloudwatch client could not make dashboard due to exception"
return
# Utility function - The function that adds each individual metric alarm.
def _add_cloudwatch_metric_alarm(self, metric):
if self.cloudwatch_client is None:
self.print_message("[DataSnapshot] ERROR - Cloudwatch client not setup. Cannot register alarm")
return
try:
self.cloudwatch_client.put_metric_alarm(
AlarmName=metric.metric_alarm_name,
AlarmDescription=metric.metric_alarm_description,
MetricName=metric.metric_name,
Namespace=self.git_metric_namespace,
Statistic="Maximum",
Dimensions=metric.metric_dimensions,
Period=60, # How long (in seconds) is an evaluation period?
EvaluationPeriods=120, # How many periods does it need to be invalid for?
DatapointsToAlarm=1, # How many data points need to be invalid?
Threshold=metric.metric_alarm_threshold,
ComparisonOperator="GreaterThanOrEqualToThreshold",
)
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not register alarm for metric due to exception: " + metric.metric_name)
self.print_message("[DataSnapshot] Exception: " + str(e))
# Utility function - removes all the Cloudwatch alarms for the metrics
def _cleanup_cloudwatch_alarms(self):
if (self.cloudwatch_teardown_alarms_on_complete == True):
try:
for metric in self.metrics:
if (not metric.metric_alarm_threshold is None):
self.cloudwatch_client.delete_alarms(AlarmNames=[metric.metric_alarm_name])
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not delete alarms due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
# Utility function - removes all Cloudwatch dashboards created
def _cleanup_cloudwatch_dashboard(self):
if (self.cloudwatch_teardown_dashboard_on_complete == True):
try:
self.cloudwatch_client.delete_dashboards(DashboardNames=[self.cloudwatch_dashboard_name])
self.print_message("[DataSnapshot] Cloudwatch Dashboards deleted successfully!")
except Exception as e:
self.print_message("[DataSnapshot] ERROR - dashboard cleaning function failed due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Cloudwatch dashboard cleaning function failed due to exception"
return
# Returns the results of the metric alarms. Will return a list containing tuples with the following structure:
# [Boolean (False = the alarm is in the ALARM state), String (Name of the alarm that is in the ALARM state), int (severity of alarm)]
# Currently this function will only return a list of failed alarms, so if the returned list is empty, then it means all
# alarms did not get to the ALARM state in Cloudwatch for the registered metrics
def get_cloudwatch_alarm_results(self):
return self._check_cloudwatch_alarm_states()
# Utility function - collects the metric alarm results and returns them in a list.
def _check_cloudwatch_alarm_states(self):
return_result_list = []
tmp = None
for metric in self.metrics:
tmp = self._check_cloudwatch_alarm_state_metric(metric)
if (tmp[1] != None):
# Do not cut a ticket for the "Alive_Alarm" that we use to check if the Canary is running
if ("Alive_Alarm" in tmp[1] == False):
if (tmp[0] != True):
return_result_list.append(tmp)
return return_result_list
# Utility function - checks each individual alarm and returns a tuple with the following format:
# [Boolean (False if the alarm is in the ALARM state, otherwise it is true), String (name of the alarm), Int (severity of alarm)]
def _check_cloudwatch_alarm_state_metric(self, metric):
alarms_response = self.cloudwatch_client.describe_alarms_for_metric(
MetricName=metric.metric_name,
Namespace=self.git_metric_namespace,
Dimensions=metric.metric_dimensions)
return_result = [True, None, metric.metric_alarm_severity]
for metric_alarm_dict in alarms_response["MetricAlarms"]:
if metric_alarm_dict["StateValue"] == "ALARM":
return_result[0] = False
return_result[1] = metric_alarm_dict["AlarmName"]
break
return return_result
# Exports a file with the same name as the commit Git hash to an S3 bucket in a folder with the Git repo name.
# By default, this file will only contain the Git hash.
# If copy_output_log is true, then the output log will be copied into this file, which may be useful for debugging.
def export_result_to_s3_bucket(self, copy_output_log=False, log_is_error=False):
if (self.s3_client is None):
self.print_message("[DataSnapshot] ERROR - No S3 client initialized! Cannot send log to S3")
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "S3 client not initialized and therefore cannot send log to S3"
return
s3_file = open(self.git_hash + ".log", "w")
s3_file.write(self.git_hash)
# Might be useful for debugging?
if (copy_output_log == True and self.output_to_file == True):
# Are we still writing? If so, then we need to close the file first so everything is written to it
is_output_file_open_previously = False
if (self.output_file != None):
self.output_file.close()
is_output_file_open_previously = True
self.output_file = open(self.output_to_file_filepath, "r")
s3_file.write("\n\nOUTPUT LOG\n")
s3_file.write("==========================================================================================\n")
output_file_lines = self.output_file.readlines()
for line in output_file_lines:
s3_file.write(line)
self.output_file.close()
# If we were writing to the output previously, then we need to open in RW mode so we can continue to write to it
if (is_output_file_open_previously == True):
self.output_to_file = open(self.output_to_file_filepath, "a")
s3_file.close()
# Upload to S3
try:
if (log_is_error == False):
if (self.datetime_string == None):
self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/" + self.git_hash + ".log")
else:
self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/" + self.datetime_string + "/" + self.git_hash + ".log")
else:
if (self.datetime_string == None):
self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/Failed_Logs/" + self.git_hash + ".log")
else:
self.s3_client.upload_file(self.git_hash + ".log", self.s3_bucket_name, self.git_repo_name + "/Failed_Logs/" + self.datetime_string + "/" + self.git_hash + ".log")
self.print_message("[DataSnapshot] Uploaded to S3!")
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not upload to S3 due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "S3 client had exception and therefore could not upload log!"
os.remove(self.git_hash + ".log")
return
# Delete the file when finished
os.remove(self.git_hash + ".log")
# Sends an email via a special lambda. The payload has to contain a message and a subject
# * (REQUIRED) message is the message you want to send in the body of the email
# * (REQUIRED) subject is the subject that the email will be sent with
def lambda_send_email(self, message, subject):
payload = {"Message":message, "Subject":subject}
payload_string = json.dumps(payload)
try:
self.lambda_client.invoke(
FunctionName=self.lambda_name,
InvocationType="Event",
ClientContext="MQTT Wrapper Script",
Payload=payload_string
)
except Exception as e:
self.print_message("[DataSnapshot] ERROR - could not send email via Lambda due to exception!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Lambda email function had an exception!"
return
# Registers a metric to be polled by the Snapshot.
# * (REQUIRED) new_metric_name is the name of the metric. Cloudwatch will use this name
# * (REQUIRED) new_metric_function is expected to be a pointer to a Python function and will not work if you pass a value/object
# * (OPTIONAL) new_metric_unit is the metric unit. There is a list of possible metric unit types on the Boto3 documentation for Cloudwatch
# * (OPTIONAL) new_metric_alarm_threshold is the value that the metric has to exceed in order to be registered as an alarm
# * (OPTIONAL) new_reports_to_skip is the number of reports this metric will return nothing, but will get it's value.
# * Useful for CPU calculations that require deltas
# * (OPTIONAL) new_metric_alarm_severity is the severity of the ticket if this alarm is triggered. A severity of 6+ means no ticket.
# * (OPTIONAL) is_percent whether or not to display the metric as a percent when printing it (default=false)
def register_metric(self, new_metric_name, new_metric_function, new_metric_unit="None",
new_metric_alarm_threshold=None, new_metric_reports_to_skip=0, new_metric_alarm_severity=6, is_percent=False):
new_metric_dimensions = []
if (self.git_hash_as_namespace == False):
git_namespace_prepend_text = self.git_repo_name + "-" + self.git_hash
new_metric_dimensions.append(
{"Name": git_namespace_prepend_text, "Value": new_metric_name})
else:
new_metric_dimensions.append(
{"Name": "System_Metrics", "Value": new_metric_name})
new_metric = DataSnapshot_Metric(
metric_name=new_metric_name,
metric_function=new_metric_function,
metric_dimensions=new_metric_dimensions,
metric_unit=new_metric_unit,
metric_alarm_threshold=new_metric_alarm_threshold,
metric_alarm_severity=new_metric_alarm_severity,
git_hash=self.git_hash,
git_repo_name=self.git_repo_name,
reports_to_skip=new_metric_reports_to_skip,
is_percent=is_percent
)
self.metrics.append(new_metric)
# append an empty list so we can track it's metrics over time
self.metrics_numbers.append([])
def register_dashboard_widget(self, new_widget_name, metrics_to_add=[], new_widget_period=60):
# We need to know what metric dimension to get the metric(s) from
metric_dimension_string = ""
if (self.git_hash_as_namespace == False):
metric_dimension_string = self.git_repo_name + "-" + self.git_hash
else:
metric_dimension_string = "System_Metrics"
widget = self._find_cloudwatch_widget(name=new_widget_name)
if (widget == None):
widget = DataSnapshot_Dashboard_Widget(
widget_name=new_widget_name, metric_namespace=self.git_metric_namespace,
metric_dimension=metric_dimension_string,
cloudwatch_region=self.cloudwatch_region,
widget_period=new_widget_period)
self.cloudwatch_dashboard_widgets.append(widget)
for metric in metrics_to_add:
self.register_metric_to_dashboard_widget(widget_name=new_widget_name, metric_name=metric)
def register_metric_to_dashboard_widget(self, widget_name, metric_name, widget=None):
if widget is None:
widget = self._find_cloudwatch_widget(name=widget_name)
if widget is None:
print ("[DataSnapshot] ERROR - could not find widget with name: " + widget_name, flush=True)
return
# Adjust metric name so it has the git hash, repo, etc
metric_name_formatted = metric_name
widget.add_metric_to_widget(new_metric_name=metric_name_formatted)
return
def remove_metric_from_dashboard_widget(self, widget_name, metric_name, widget=None):
if widget is None:
widget = self._find_cloudwatch_widget(name=widget_name)
if widget is None:
print ("[DataSnapshot] ERROR - could not find widget with name: " + widget_name, flush=True)
return
widget.remove_metric_from_widget(existing_metric_name=metric_name)
return
def _find_cloudwatch_widget(self, name):
result = None
for widget in self.cloudwatch_dashboard_widgets:
if widget.widget_name == name:
return widget
return result
# Prints the metrics to the console
def export_metrics_console(self):
datetime_now = datetime.datetime.now()
datetime_string = datetime_now.strftime("%d-%m-%Y/%H:%M:%S")
self.print_message("\n[DataSnapshot] Metric report: " + str(self.metric_report_number) + " (" + datetime_string + ")")
for metric in self.metrics:
if (metric.is_percent == True):
self.print_message(" " + metric.metric_name +
" - value: " + str(metric.metric_value) + "%")
else:
self.print_message(" " + metric.metric_name +
" - value: " + str(metric.metric_value))
self.print_message("")
# Sends all registered metrics to Cloudwatch.
# Does NOT need to called on loop. Call post_metrics on loop to send all the metrics as expected.
# This is just the Cloudwatch part of that loop.
def export_metrics_cloudwatch(self):
if (self.cloudwatch_client == None):
self.print_message("[DataSnapshot] Error - cannot export Cloudwatch metrics! Cloudwatch was not initialized.")
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Could not export Cloudwatch metrics due to no Cloudwatch client initialized!"
return
self.print_message("[DataSnapshot] Preparing to send to Cloudwatch...")
metrics_data = []
metric_data_tmp = None
for metric in self.metrics:
metric_data_tmp = metric.get_metric_cloudwatch_dictionary()
if (not metric_data_tmp is None):
metrics_data.append(metric_data_tmp)
if (len(metrics_data) == 0):
self.print_message("[DataSnapshot] INFO - no metric data to send. Skipping...")
return
try:
self.cloudwatch_client.put_metric_data(
Namespace=self.git_metric_namespace,
MetricData=metrics_data)
self.print_message("[DataSnapshot] Metrics sent to Cloudwatch.")
except Exception as e:
self.print_message("[DataSnapshot] Error - something when wrong posting cloudwatch metrics!")
self.print_message("[DataSnapshot] Exception: " + str(e))
self.abort_due_to_internal_error = True
self.abort_due_to_internal_error_reason = "Could not export Cloudwatch metrics due to exception in Cloudwatch client!"
return
# Call this at a set interval to post the metrics to Cloudwatch, etc.
# This is the function you want to call repeatedly after you have everything setup.
def post_metrics(self, psutil_process : psutil.Process):
if (self.perform_final_initialization == True):
self.perform_final_initialization = False
self._init_cloudwatch_pre_first_run()
# Update the metric values internally
for i in range(0, len(self.metrics)):
metric_value = self.metrics[i].get_metric_value(psutil_process)
self.metrics_numbers[i].insert(0, metric_value)
# Only keep the last metric_report_non_zero_count results
if (len(self.metrics_numbers[i]) > self.metric_report_non_zero_count):
amount_to_delete = len(self.metrics_numbers[i]) - self.metric_report_non_zero_count
del self.metrics_numbers[i][-amount_to_delete:]
# If we have metric_report_non_zero_count amount of metrics, make sure there is at least one
# non-zero. If it is all zero, then print a log so we can easily find it
if (len(self.metrics_numbers[i]) == self.metric_report_non_zero_count):
non_zero_found = False
for j in range(0, len(self.metrics_numbers[i])):
if (self.metrics_numbers[i][j] != 0.0 and self.metrics_numbers[i][j] != None):
non_zero_found = True
break
if (non_zero_found == False):
self.print_message("\n[DataSnapshot] METRIC ZERO ERROR!")
self.print_message(f"[DataSnapshot] Metric index {i} has been zero for last {self.metric_report_non_zero_count} reports!")
self.print_message("\n")
self.metric_report_number += 1
self.export_metrics_console()
self.export_metrics_cloudwatch()
def output_diagnosis_information(self, dependencies_list):
# Print general diagnosis information
self.print_message("\n========== Canary Wrapper diagnosis information ==========")
self.print_message("\nRunning Canary for repository: " + self.git_repo_name)
self.print_message("\t Commit hash: " + self.git_hash)
if not dependencies_list == "":
self.print_message("\nDependencies:")
dependencies_list = dependencies_list.split(";")
dependencies_list_found_hash = False
for i in range(0, len(dependencies_list)):
# There's probably a better way to do this...
if (dependencies_list_found_hash == True):
dependencies_list_found_hash = False
continue
self.print_message("* " + dependencies_list[i])
if (i+1 < len(dependencies_list)):
self.print_message("\t Commit hash: " + dependencies_list[i+1])
dependencies_list_found_hash = True
else:
self.print_message("\t Commit hash: Unknown")
if (self.metric_frequency != None):
self.print_message("\nMetric Snapshot Frequency: " + str(self.metric_frequency) + " seconds")
self.print_message("\nMetrics:")
for metric in self.metrics:
self.print_message("* " + metric.metric_name)
if metric.metric_alarm_threshold is not None:
self.print_message("\t Alarm Threshold: " + str(metric.metric_alarm_threshold))
self.print_message("\t Alarm Severity: " + str(metric.metric_alarm_severity))
else:
self.print_message("\t No alarm set for metric.")
self.print_message("\n")
self.print_message("==========================================================")
self.print_message("\n")
# ================================================================================
class SnapshotMonitor():
def __init__(self, wrapper_data_snapshot, wrapper_metrics_wait_time) -> None:
self.data_snapshot = wrapper_data_snapshot
self.had_internal_error = False
self.error_due_to_credentials = False
self.internal_error_reason = ""
self.error_due_to_alarm = False
self.can_cut_ticket = False
self.has_cut_ticket = False
# A list of all the alarms triggered in the last check, cached for later
# NOTE - this is only the alarm names! Not the severity. This just makes it easier to process
self.cloudwatch_current_alarms_triggered = []
# Check for errors
if (self.data_snapshot.abort_due_to_internal_error == True):
self.had_internal_error = True
self.internal_error_reason = "Could not initialize DataSnapshot. Likely credentials are not setup!"
if (self.data_snapshot.abort_due_to_internal_error_due_to_credentials == True):
self.error_due_to_credentials = True
self.data_snapshot.cleanup()
return
# How long to wait before posting a metric
self.metric_post_timer = 0
self.metric_post_timer_time = wrapper_metrics_wait_time
def register_metric(self, new_metric_name, new_metric_function, new_metric_unit="None", new_metric_alarm_threshold=None,
new_metric_reports_to_skip=0, new_metric_alarm_severity=6):
try:
self.data_snapshot.register_metric(
new_metric_name=new_metric_name,
new_metric_function=new_metric_function,
new_metric_unit=new_metric_unit,
new_metric_alarm_threshold=new_metric_alarm_threshold,
new_metric_reports_to_skip=new_metric_reports_to_skip,
new_metric_alarm_severity=new_metric_alarm_severity)
except Exception as e:
self.print_message("[SnaptshotMonitor] ERROR - could not register metric in data snapshot due to exception!")
self.print_message("[SnaptshotMonitor] Exception: " + str(e))
self.had_internal_error = True
self.internal_error_reason = "Could not register metric in data snapshot due to exception"
return
def register_dashboard_widget(self, new_widget_name, metrics_to_add=[], widget_period=60):
self.data_snapshot.register_dashboard_widget(new_widget_name=new_widget_name, metrics_to_add=metrics_to_add, new_widget_period=widget_period)
def output_diagnosis_information(self, dependencies=""):
self.data_snapshot.output_diagnosis_information(dependencies_list=dependencies)
def check_alarms_for_new_alarms(self, triggered_alarms):
if len(triggered_alarms) > 0:
self.data_snapshot.print_message(
"WARNING - One or more alarms are in state of ALARM")
old_alarms_still_active = []
new_alarms = []
new_alarms_highest_severity = 6
new_alarm_found = True
new_alarm_ticket_description = "Canary has metrics in ALARM state!\n\nMetrics in alarm:\n"
for triggered_alarm in triggered_alarms:
new_alarm_found = True
# Is this a new alarm?
for old_alarm_name in self.cloudwatch_current_alarms_triggered:
if (old_alarm_name == triggered_alarm[1]):
new_alarm_found = False
old_alarms_still_active.append(triggered_alarm[1])
new_alarm_ticket_description += "* (STILL IN ALARM) " + triggered_alarm[1] + "\n"
new_alarm_ticket_description += "\tSeverity: " + str(triggered_alarm[2])
new_alarm_ticket_description += "\n"
break
# If it is a new alarm, then add it to our list so we can cut a new ticket
if (new_alarm_found == True):
self.data_snapshot.print_message(' (NEW) Alarm with name "' + triggered_alarm[1] + '" is in the ALARM state!')
new_alarms.append(triggered_alarm[1])
if (triggered_alarm[2] < new_alarms_highest_severity):
new_alarms_highest_severity = triggered_alarm[2]
new_alarm_ticket_description += "* " + triggered_alarm[1] + "\n"
new_alarm_ticket_description += "\tSeverity: " + str(triggered_alarm[2])
new_alarm_ticket_description += "\n"
if len(new_alarms) > 0:
if (self.can_cut_ticket == True):
cut_ticket_using_cloudwatch(
git_repo_name=self.data_snapshot.git_repo_name,
git_hash=self.data_snapshot.git_hash,
git_hash_as_namespace=False,
git_fixed_namespace_text=self.data_snapshot.git_fixed_namespace_text,
cloudwatch_region="us-east-1",
ticket_description="New metric(s) went into alarm for the Canary! Metrics in alarm: " + str(new_alarms),
ticket_reason="New metric(s) went into alarm",
ticket_allow_duplicates=True,
ticket_category="AWS",
ticket_item="IoT SDK for CPP",
ticket_group="AWS IoT Device SDK",
ticket_type="SDKs and Tools",
ticket_severity=4)
self.has_cut_ticket = True
# Cache the new alarms and the old alarms
self.cloudwatch_current_alarms_triggered = old_alarms_still_active + new_alarms
else:
self.cloudwatch_current_alarms_triggered.clear()
def monitor_loop_function(self, psutil_process : psutil.Process, time_passed=30):
# Check for internal errors
if (self.data_snapshot.abort_due_to_internal_error == True):
self.had_internal_error = True
self.internal_error_reason = "Data Snapshot internal error: " + self.data_snapshot.abort_due_to_internal_error_reason
return
try:
# Poll the metric alarms
if (self.had_internal_error == False):
# Get a report of all the alarms that might have been set to an alarm state
triggered_alarms = self.data_snapshot.get_cloudwatch_alarm_results()
self.check_alarms_for_new_alarms(triggered_alarms)
except Exception as e:
self.print_message("[SnaptshotMonitor] ERROR - exception occurred checking metric alarms!")
self.print_message("[SnaptshotMonitor] (Likely session credentials expired)")
self.had_internal_error = True
self.internal_error_reason = "Exception occurred checking metric alarms! Likely session credentials expired"
return
if (self.metric_post_timer <= 0):
if (self.had_internal_error == False):
try:
self.data_snapshot.post_metrics(psutil_process)
except Exception as e:
self.print_message("[SnaptshotMonitor] ERROR - exception occurred posting metrics!")
self.print_message("[SnaptshotMonitor] (Likely session credentials expired)")
print (e, flush=True)
self.had_internal_error = True
self.internal_error_reason = "Exception occurred posting metrics! Likely session credentials expired"
return
# reset the timer
self.metric_post_timer += self.metric_post_timer_time
# Gather and post the metrics
self.metric_post_timer -= time_passed
def send_email(self, email_body, email_subject_text_append=None):
if (email_subject_text_append != None):
self.data_snapshot.lambda_send_email(email_body, "Canary: " + self.data_snapshot.git_repo_name + ":" + self.data_snapshot.git_hash + " - " + email_subject_text_append)
else:
self.data_snapshot.lambda_send_email(email_body, "Canary: " + self.data_snapshot.git_repo_name + ":" + self.data_snapshot.git_hash)
def stop_monitoring(self):
# Stub - just added for consistency
pass
def start_monitoring(self):
# Stub - just added for consistency
pass
def restart_monitoring(self):
# Stub - just added for consistency
pass
def cleanup_monitor(self, error_occurred=False):
self.data_snapshot.cleanup(error_occurred=error_occurred)
def print_message(self, message):
if (self.data_snapshot != None):
self.data_snapshot.print_message(message)
else:
print(message, flush=True)
# ================================================================================
class ApplicationMonitor():
def __init__(self, wrapper_application_path, wrapper_application_arguments, wrapper_application_restart_on_finish=True, data_snapshot=None) -> None:
self.application_process = None
self.application_process_psutil = None
self.error_has_occurred = False
self.error_due_to_credentials = False
self.error_reason = ""
self.error_code = 0
self.wrapper_application_path = wrapper_application_path
self.wrapper_application_arguments = wrapper_application_arguments
self.wrapper_application_restart_on_finish = wrapper_application_restart_on_finish
self.data_snapshot=data_snapshot
self.stdout_file_path = "Canary_Stdout_File.txt"
def start_monitoring(self):
self.print_message("[ApplicationMonitor] Starting to monitor application...")
if (self.application_process == None):
try:
canary_command = self.wrapper_application_path + " " + self.wrapper_application_arguments
self.application_process = subprocess.Popen(canary_command + " | tee " + self.stdout_file_path, shell=True)
self.application_process_psutil = psutil.Process(self.application_process.pid)
self.print_message ("[ApplicationMonitor] Application started...")
except Exception as e:
self.print_message ("[ApplicationMonitor] ERROR - Could not launch Canary/Application due to exception!")
self.print_message ("[ApplicationMonitor] Exception: " + str(e))
self.error_has_occurred = True
self.error_reason = "Could not launch Canary/Application due to exception"
self.error_code = 1
return
else:
self.print_message("[ApplicationMonitor] ERROR - Monitor already has an application process! Cannot monitor two applications with one monitor class!")
def restart_monitoring(self):
self.print_message ("[ApplicationMonitor] Restarting monitor application...")
if (self.application_process != None):
try:
self.stop_monitoring()
self.start_monitoring()
self.print_message("\n[ApplicationMonitor] Restarted monitor application!")
self.print_message("================================================================================")
except Exception as e:
self.print_message("[ApplicationMonitor] ERROR - Could not restart Canary/Application due to exception!")
self.print_message("[ApplicationMonitor] Exception: " + str(e))
self.error_has_occurred = True
self.error_reason = "Could not restart Canary/Application due to exception"
self.error_code = 1
return
else:
self.print_message("[ApplicationMonitor] ERROR - Application process restart called but process is/was not running!")
self.error_has_occurred = True
self.error_reason = "Could not restart Canary/Application due to application process not being started initially"
self.error_code = 1
return
def stop_monitoring(self):
self.print_message ("[ApplicationMonitor] Stopping monitor application...")
if (not self.application_process == None):
self.application_process.terminate()
self.application_process.wait()
self.print_message ("[ApplicationMonitor] Stopped monitor application!")
self.application_process = None
self.print_stdout()
else:
self.print_message ("[ApplicationMonitor] ERROR - cannot stop monitor application because no process is found!")
def print_stdout(self):
# Print the STDOUT file
if (os.path.isfile(self.stdout_file_path)):
self.print_message("Just finished Application STDOUT: ")
with open(self.stdout_file_path, "r") as stdout_file:
self.print_message(stdout_file.read())
os.remove(self.stdout_file_path)
def monitor_loop_function(self, time_passed=30):
if (self.application_process != None):
application_process_return_code = None
try:
application_process_return_code = self.application_process.poll()
except Exception as e:
self.print_message("[ApplicationMonitor] ERROR - exception occurred while trying to poll application status!")
self.print_message("[ApplicationMonitor] Exception: " + str(e))
self.error_has_occurred = True
self.error_reason = "Exception when polling application status"
self.error_code = 1
return
# If it is not none, then the application finished
if (application_process_return_code != None):
self.print_message("[ApplicationMonitor] Monitor application has stopped! Processing result...")
if (application_process_return_code != 0):
self.print_message("[ApplicationMonitor] ERROR - Something Crashed in Canary/Application!")
self.print_message("[ApplicationMonitor] Error code: " + str(application_process_return_code))
self.error_has_occurred = True
self.error_reason = "Canary application crashed!"
self.error_code = application_process_return_code
else:
# Should we restart?
if (self.wrapper_application_restart_on_finish == True):
self.print_message("[ApplicationMonitor] NOTE - Canary finished running and is restarting...")
self.restart_monitoring()
else:
self.print_message("[ApplicationMonitor] Monitor application has stopped and monitor is not supposed to restart... Finishing...")
self.error_has_occurred = True
self.error_reason = "Canary Application Finished"
self.error_code = 0
else:
self.print_message("[ApplicationMonitor] Monitor application is still running...")
def cleanup_monitor(self, error_occurred=False):
pass
def print_message(self, message):
if (self.data_snapshot != None):
self.data_snapshot.print_message(message)
else:
print(message, flush=True)
# ================================================================================
class S3Monitor():
def __init__(self, s3_bucket_name, s3_file_name, s3_file_name_in_zip, canary_local_application_path, data_snapshot) -> None:
self.s3_client = None
self.s3_current_object_version_id = None
self.s3_current_object_last_modified = None
self.s3_bucket_name = s3_bucket_name
self.s3_file_name = s3_file_name
self.s3_file_name_only_path, self.s3_file_name_only_extension = os.path.splitext(s3_file_name)
self.data_snapshot = data_snapshot
self.canary_local_application_path = canary_local_application_path
self.s3_file_name_in_zip = s3_file_name_in_zip
self.s3_file_name_in_zip_only_path = None
self.s3_file_name_in_zip_only_extension = None
if (self.s3_file_name_in_zip != None):
self.s3_file_name_in_zip_only_path, self.s3_file_name_in_zip_only_extension = os.path.splitext(s3_file_name_in_zip)
self.s3_file_needs_replacing = False
self.had_internal_error = False
self.error_due_to_credentials = False
self.internal_error_reason = ""
# Check for valid credentials
# ==================
try:
tmp_sts_client = boto3.client('sts')
tmp_sts_client.get_caller_identity()
except Exception as e:
self.print_message("[S3Monitor] ERROR - (S3 Check) AWS credentials are NOT valid!")
self.had_internal_error = True
self.error_due_to_credentials = True
self.internal_error_reason = "AWS credentials are NOT valid!"
return
# ==================
try:
self.s3_client = boto3.client("s3")
except Exception as e:
self.print_message("[S3Monitor] ERROR - (S3 Check) Could not make S3 client")
self.had_internal_error = True
self.internal_error_reason = "Could not make S3 client for S3 Monitor"
return
def check_for_file_change(self):
try:
version_check_response = self.s3_client.list_object_versions(
Bucket=self.s3_bucket_name,
Prefix=self.s3_file_name_only_path)
if "Versions" in version_check_response:
for version in version_check_response["Versions"]:
if (version["IsLatest"] == True):
if (version["VersionId"] != self.s3_current_object_version_id or
version["LastModified"] != self.s3_current_object_last_modified):
self.print_message("[S3Monitor] Found new version of Canary/Application in S3!")
self.print_message("[S3Monitor] Changing running Canary/Application to new one...")
# Will be checked by thread to trigger replacing the file
self.s3_file_needs_replacing = True
self.s3_current_object_version_id = version["VersionId"]
self.s3_current_object_last_modified = version["LastModified"]
return
except Exception as e:
self.print_message("[S3Monitor] ERROR - Could not check for new version of file in S3 due to exception!")
self.print_message("[S3Monitor] Exception: " + str(e))
self.had_internal_error = True
self.internal_error_reason = "Could not check for S3 file due to exception in S3 client"
def replace_current_file_for_new_file(self):
try:
self.print_message("[S3Monitor] Making directory...")
if not os.path.exists("tmp"):
os.makedirs("tmp")
except Exception as e:
self.print_message ("[S3Monitor] ERROR - could not make tmp directory to place S3 file into!")
self.had_internal_error = True
self.internal_error_reason = "Could not make TMP folder for S3 file download"
return
# Download the file
new_file_path = "tmp/new_file" + self.s3_file_name_only_extension
try:
self.print_message("[S3Monitor] Downloading file...")
s3_resource = boto3.resource("s3")
s3_resource.meta.client.download_file(self.s3_bucket_name, self.s3_file_name, new_file_path)
except Exception as e:
self.print_message("[S3Monitor] ERROR - could not download latest S3 file into TMP folder!")
self.had_internal_error = True
self.internal_error_reason = "Could not download latest S3 file into TMP folder"
return
# Is it a zip file?
if (self.s3_file_name_in_zip != None):
self.print_message("[S3Monitor] New file is zip file. Unzipping...")
# Unzip it!
with zipfile.ZipFile(new_file_path, 'r') as zip_file:
zip_file.extractall("tmp/new_file_zip")
new_file_path = "tmp/new_file_zip/" + self.s3_file_name_in_zip_only_path + self.s3_file_name_in_zip_only_extension
try:
# is there a file already present there?
if os.path.exists(self.canary_local_application_path) == True:
os.remove(self.canary_local_application_path)
self.print_message("[S3Monitor] Moving file...")
os.replace(new_file_path, self.canary_local_application_path)
self.print_message("[S3Monitor] Getting execution rights...")
os.system("chmod u+x " + self.canary_local_application_path)
except Exception as e:
self.print_message("[S3Monitor] ERROR - could not move file into local application path due to exception!")
self.print_message("[S3Monitor] Exception: " + str(e))
self.had_internal_error = True
self.internal_error_reason = "Could not move file into local application path"
return
self.print_message("[S3Monitor] New file downloaded and moved into correct location!")
self.s3_file_needs_replacing = False
def stop_monitoring(self):
# Stub - just added for consistency
pass
def start_monitoring(self):
# Stub - just added for consistency
pass
def restart_monitoring(self):
# Stub - just added for consistency
pass
def cleanup_monitor(self):
# Stub - just added for consistency
pass
def monitor_loop_function(self, time_passed=30):
self.check_for_file_change()
def print_message(self, message):
if (self.data_snapshot != None):
self.data_snapshot.print_message(message)
else:
print(message, flush=True)
# ================================================================================
# Cuts a ticket to SIM using a temporary Cloudwatch metric that is quickly created, triggered, and destroyed.
# Can be called in any thread - creates its own Cloudwatch client and any data it needs is passed in.
#
# See (https://w.amazon.com/bin/view/CloudWatchAlarms/Internal/CloudWatchAlarmsSIMTicketing) for more details
# on how the alarm is sent using Cloudwatch.
def cut_ticket_using_cloudwatch(
ticket_description="Description here!",
ticket_reason="Reason here!",
ticket_severity=5,
ticket_category="AWS",
ticket_type="SDKs and Tools",
ticket_item="IoT SDK for CPP",
ticket_group="AWS IoT Device SDK",
ticket_allow_duplicates=False,
git_repo_name="REPO NAME",
git_hash="HASH",
git_hash_as_namespace=False,
git_fixed_namespace_text="mqtt5_canary",
cloudwatch_region="us-east-1"):
git_metric_namespace = ""
if (git_hash_as_namespace == False):
git_metric_namespace = git_fixed_namespace_text
else:
git_namespace_prepend_text = git_repo_name + "-" + git_hash
git_metric_namespace = git_namespace_prepend_text
try:
cloudwatch_client = boto3.client('cloudwatch', cloudwatch_region)
ticket_alarm_name = git_repo_name + "-" + git_hash + "-AUTO-TICKET"
except Exception as e:
print ("ERROR - could not create Cloudwatch client to make ticket metric alarm due to exception!")
print ("Exception: " + str(e), flush=True)
return
new_metric_dimensions = []
if (git_hash_as_namespace == False):
git_namespace_prepend_text = git_repo_name + "-" + git_hash
new_metric_dimensions.append(
{"Name": git_namespace_prepend_text, "Value": ticket_alarm_name})
else:
new_metric_dimensions.append(
{"Name": "System_Metrics", "Value": ticket_alarm_name})
ticket_arn = f"arn:aws:cloudwatch::cwa-internal:ticket:{ticket_severity}:{ticket_category}:{ticket_type}:{ticket_item}:{ticket_group}:"
if (ticket_allow_duplicates == True):
# use "DO-NOT-DEDUPE" so we can run the same commit again and it will cut another ticket.
ticket_arn += "DO-NOT-DEDUPE"
# In the ticket ARN, all spaces need to be replaced with +
ticket_arn = ticket_arn.replace(" ", "+")
ticket_alarm_description = f"AUTO CUT CANARY WRAPPER TICKET\n\nREASON: {ticket_reason}\n\nDESCRIPTION: {ticket_description}\n\n"
# Register a metric alarm so it can auto-cut a ticket for us
try:
cloudwatch_client.put_metric_alarm(
AlarmName=ticket_alarm_name,
AlarmDescription=ticket_alarm_description,
MetricName=ticket_alarm_name,
Namespace=git_metric_namespace,
Statistic="Maximum",
Dimensions=new_metric_dimensions,
Period=60, # How long (in seconds) is an evaluation period?
EvaluationPeriods=1, # How many periods does it need to be invalid for?
DatapointsToAlarm=1, # How many data points need to be invalid?
Threshold=1,
ComparisonOperator="GreaterThanOrEqualToThreshold",
# The data above does not really matter - it just needs to be valid input data.
# This is the part that tells Cloudwatch to cut the ticket
AlarmActions=[ticket_arn]
)
except Exception as e:
print ("ERROR - could not create ticket metric alarm due to exception!")
print ("Exception: " + str(e), flush=True)
return
# Trigger the alarm so it cuts the ticket
try:
cloudwatch_client.set_alarm_state(
AlarmName=ticket_alarm_name,
StateValue="ALARM",
StateReason="AUTO TICKET CUT")
except Exception as e:
print ("ERROR - could not cut ticket due to exception!")
print ("Exception: " + str(e), flush=True)
return
print("Waiting for ticket metric to trigger...", flush=True)
# Wait a little bit (2 seconds)...
time.sleep(2)
# Remove the metric
print("Removing ticket metric...", flush=True)
cloudwatch_client.delete_alarms(AlarmNames=[ticket_alarm_name])
print ("Finished cutting ticket via Cloudwatch!", flush=True)
return
# A helper function that gets the majority of the ticket information from the arguments result from argparser.
def cut_ticket_using_cloudwatch_from_args(
ticket_description="",
ticket_reason="",
ticket_severity=6,
arguments=None):
# Do not cut a ticket for a severity of 6+
if (ticket_severity >= 6):
return
cut_ticket_using_cloudwatch(
ticket_description=ticket_description,
ticket_reason=ticket_reason,
ticket_severity=ticket_severity,
ticket_category=arguments.ticket_category,
ticket_type=arguments.ticket_type,
ticket_item=arguments.ticket_item,
ticket_group=arguments.ticket_group,
ticket_allow_duplicates=False,
git_repo_name=arguments.git_repo_name,
git_hash=arguments.git_hash,
git_hash_as_namespace=arguments.git_hash_as_namespace)