#!/usr/bin/env python3 # eCall # Bulk: yes # -*- coding: utf-8 -*- "eCall SMS notification plugin for Check_MK monitoring systems using rulebased notifications" import argparse import logging import os from urllib.parse import quote from logging.handlers import TimedRotatingFileHandler from cmk.notification_plugins.utils import ( collect_context, read_bulk_contexts, get_password_from_env_or_context, substitute_context, ) from cmk_addons.plugins.ecall.lib.api import SMS, Voice class MessageFormat(object): def __init__(self, shared, service=None, host=None): self._shared = shared self._service = service self._host = host @property def service(self): if self._shared and self._service: return "{}\n{}".format(self._shared, self._service) elif self._service: # shared is empty return self._service return self._shared @property def host(self): if self._shared and self._host: return "{}\n{}".format(self._shared, self._host) elif self._host: # shared is empty return self._host return self._shared class SettingsException(Exception): pass class Settings(object): def __init__(self, notification_parameters): p = notification_parameters # keys that may just be copied: # username, password, log_file, log_level, stdout # handle simple mandatory parameters self.username = get_password_from_env_or_context("USERNAME", p) self.password = get_password_from_env_or_context("PASSWORD", p) # in case of using the store, there will be None values when changes have not been activated if self.username is None: raise SettingsException( "Got empty username. Did you activate changes after adding the user to the store?" ) if self.password is None: raise SettingsException( "Got empty password. Did you activate changes after adding the password to the store?" ) self.message_length_limit = int(p["CONTENT_LENGTH_LIMIT"]) self.log_file = os.path.expanduser(p["LOG_FILE"]) self.log_level = { "d": logging.DEBUG, "i": logging.INFO, "w": logging.WARNING, "c": logging.CRITICAL, }.get(p["LOG_LEVEL"]) # handle simple optional parameters if "PROXY_HOST" in p: proto = p["PROXY_PROTOCOL"] host = p["PROXY_HOST"] port = p["PROXY_PORT"] user = p.get("PROXY_AUTHENTICATION_USER") if user: user = quote(user) if p.get("PROXY_AUTHENTICATION_PASSWORD_1"): passwd = get_password_from_env_or_context("PROXY_AUTHENTICATION_PASSWORD", p) passwd = quote(passwd) if user and passwd: url = f"{proto}://{user}:{passwd}@{host}:{port}" elif user: url = f"{proto}://{user}@{host}:{port}" else: url = f"{proto}://{host}:{port}" self.proxy = { "https": url #proto: url } else: self.proxy = None if "SSL_SKIP_VERIFY" in p and p["SSL_SKIP_VERIFY"] == "True": self.ssl_skip_verify = True else: self.ssl_skip_verify = False if "STDOUT" in p and p["STDOUT"] == "True": self.log_stdout = True else: self.log_stdout = False # handle complex mandatory parameters ## parse message format self.bulk_message_prefix = "" if "CONTENT_BULK_PREFIX" in p: self.bulk_message_prefix = p["CONTENT_BULK_PREFIX"] message_format_shared = p["CONTENT_SHARED"] message_format_specifc = {} if "CONTENT_HOST" in p: message_format_specifc["host"] = p["CONTENT_HOST"] if "CONTENT_SERVICE" in p: message_format_specifc["service"] = p["CONTENT_SERVICE"] self.message_format = MessageFormat(message_format_shared, **message_format_specifc) ## iterate over every action parameter key in params self.api_class = SMS if p["ACTION_1"] == "sms" else Voice self.api_params = {} for param_key in filter(lambda k: k.startswith("ACTION_2_"), p.keys()): key = param_key.split("_")[2].lower() val = p[param_key] self.api_params[key] = val if level := self.api_params.get("notificationlevel"): self.api_params["notificationlevel"] = level.split("_")[1] if nolog := self.api_params.get("nolog"): if nolog == "True": self.api_params["nolog"] = 1 else: self.api_params["nolog"] = 0 class Notification(object): def __init__(self, context, message_format, message_length_limit): self._logger = logging.getLogger("ecall_plugin") self._context = context self._message_format = message_format self._length_limit = message_length_limit self.contact = context["CONTACTPAGER"] def _limit(self, text): self._logger.info("Limiting message length to at most %i characters", self._length_limit) return text[: self._length_limit] @property def message(self): if self._context["WHAT"] == "SERVICE": message = substitute_context(self._message_format.service, self._context) else: message = substitute_context(self._message_format.host, self._context) self._logger.debug("Unlimited message is '%s'", message) return self._limit(message) class BulkNotification(Notification): def __init__(self, context_list, message_format, message_length_limit, bulk_message_prefix): # call super to get shared properties # just use the first context as reference for contact # the official plugins also assume that contact is the same for every context super(BulkNotification, self).__init__( context_list[0], message_format, message_length_limit ) self._prefix = bulk_message_prefix self._context_list = context_list def __format_message(self): prefixes = [] affected_hosts = [] # TODO: This is currently not being used service_states = {} host_states = {} self._logger.info("Processing bulk context list") for context in self._context_list: self._logger.debug("Processing context %s", context) affected_host = context["HOSTNAME"] # keep track of prefixes (if any) if self._prefix: # prefix is either HOSTGROUPNAMES or SERVICEGROUPNAMES # Both are lists of group names separated by whitespace prefixes += context[self._prefix].split(" ") # fill list of affected hosts if affected_host not in affected_hosts: affected_hosts.append(affected_host) # keep track of states # TODO: We could code logic here to only send the most recent, worst state for each item if context["WHAT"] == "SERVICE": if context["PARSED_STATE"] not in service_states: service_states[context["PARSED_STATE"]] = 1 else: service_states[context["PARSED_STATE"]] += 1 else: if context["PARSED_STATE"] not in host_states: host_states[context["PARSED_STATE"]] = 1 else: host_states[context["PARSED_STATE"]] += 1 # consolidate prefixes prefixes = list(set(prefixes)) prefix_info = "" if prefixes: prefix_info = ", ".join(prefixes) self._logger.debug("Got the following prefix for bulk message: %s", prefix_info) prefix_info += ": " # generate message self._logger.debug("Raw service states are: %s", service_states) self._logger.debug("Raw host states are: %s", host_states) service_info = map(lambda i: "%s: %i" % (i[0].capitalize(), i[1]), service_states.items()) host_info = map(lambda i: "%s: %i" % (i[0].capitalize(), i[1]), host_states.items()) message_items = [] if host_info: self._logger.debug("Processed host info is: %s", host_info) message_items.append("Hosts %s" % ", ".join(host_info)) if service_info: self._logger.debug("Processed service info is: %s", service_info) message_items.append("Services %s" % ", ".join(service_info)) return prefix_info + ", ".join(message_items) @property def message(self): # use base class when there is only one notification in bulk context if len(self._context_list) == 1: self._logger.info( "Only one context in bulk notification, use default notification method" ) return super(BulkNotification, self).message return self._limit(self.__format_message()) class ContextParser(object): "Parse variable inputs from checkmk notifier" def __init__(self): argument_parser = argparse.ArgumentParser( description="Send eCall SMS from checkmk monitoring notifications." ) argument_parser.add_argument( "--bulk", help="Enable checkmk bulk mode", action="store_true", default=False ) args = argument_parser.parse_args() # TODO: maybe this should be configurable in the GUI. # right now, the only notificationtype left is "PROBLEM". # Only for "PROBLEM" the message format is actually being # used. This is not optimal. May be let the user set a # default message format and format and state per type of # notification? This would be more flexible. self.__special_states = { "FLAPPINGSTART": ("WARNING", "started flapping"), "FLAPPINGSTOP": ("OK", "stopped flapping"), "ACKNOWLEDGEMENT": ("OK", "acknowledged"), "RECOVERY": ("OK", "recovered"), "DOWNTIMESTART": ("OK", "downtime started"), "DOWNTIMECANCELLED": ("OK", "downtime cancelled"), "DOWNTIMEEND": ("OK", "downtime ended"), } self.bulk = args.bulk if args.bulk: self.params, self.context = self.__parse_bulk() else: self.params, self.context = self.__parse_rulebased() def __parse_filter_context(self, context, params, is_bulk=False): "Add custom macros to the context and strip out any parameters" what = context["WHAT"] notification_type = context["NOTIFICATIONTYPE"] context["PARSED_ITEM"] = context["HOSTNAME"] if what == "SERVICE": if params.get("SHOW_HOST_IF_SERVICE", "True") == "True": context["PARSED_ITEM"] = "%s on %s" % ( context["SERVICEDESC"], context["PARSED_ITEM"], ) else: context["PARSED_ITEM"] = context["SERVICEDESC"] if notification_type in self.__special_states: context["PARSED_STATE"], context["PARSED_OUTPUT"] = self.__special_states[ notification_type ] if is_bulk: # Since we are merely counting the notifications for each state # it makes no sense to keep the original OK/WARN/CRIT/UP/DOWN here. # Instead, we overwrite it with the pretty output (e.g. "started flapping") # so that this is counted in the bulk message. # e.g. Host Started flapping: 1, Downtime ended: 2, Down: 2, Service OK: 3, acknowledged: 4 context["PARSED_STATE"] = context["PARSED_OUTPUT"] else: context["PARSED_OUTPUT"] = context["%sOUTPUT" % what] context["PARSED_STATE"] = context["%sSTATE" % what] # return new context without parameters (since they would expose passwords) return dict(filter(lambda i: not i[0].startswith("PARAMETER_"), context.items())) def __parse_parameters(self, context): params = {} for param_key in filter(lambda k: k.startswith("PARAMETER_"), context.keys()): params[param_key[10::]] = context[param_key] return params def __parse_rulebased(self): context = collect_context() params = self.__parse_parameters(context) return params, self.__parse_filter_context(context, params) def __parse_bulk(self): params, context_list = read_bulk_contexts() params = self.__parse_parameters(params) # add custom fields to bulk contexts # when there are no more than 1 contexts in the list, do not use bulk mode (--> is_bulk=False) enriched_contexts = [ self.__parse_filter_context(context, params, is_bulk=len(context_list) > 1) for context in context_list ] return params, enriched_contexts class Plugin(object): """ Check_MK notification plugin to notify events using eCall's HTTP/S API. """ def __init__(self): parser = ContextParser() settings = Settings(parser.params) self._logger = logging.getLogger("ecall_plugin") self.__configure_logging(settings.log_level, settings.log_file, settings.log_stdout) self._logger.info("Starting eCall notification process") self._logger.debug("Notification context is %s", parser.context) if parser.bulk: notification = BulkNotification( parser.context, settings.message_format, settings.message_length_limit, settings.bulk_message_prefix, ) else: notification = Notification( parser.context, settings.message_format, settings.message_length_limit ) self._logger.debug("Formatted message is %s", notification.message) self._logger.debug("Initialising eCall API Object") self.api = settings.api_class( settings.username, settings.password, notification.contact, notification.message, requests_proxy_hosts=settings.proxy, requests_verify_cert=not settings.ssl_skip_verify, **settings.api_params, ) self._logger.debug("Initialisation done") def __configure_logging(self, log_level, log_file, log_stdout): self._logger.setLevel(log_level) log_format = logging.Formatter("%(asctime)s - %(levelname)s | %(message)s") loghandler_file = TimedRotatingFileHandler( log_file, when="midnight", interval=1, backupCount=10 ) loghandler_file.setFormatter(log_format) loghandler_file.setLevel(log_level) self._logger.addHandler(loghandler_file) if log_stdout: logger_console = logging.StreamHandler() logger_console.setFormatter(log_format) logger_console.setLevel(log_level) self._logger.addHandler(logger_console) def notify(self): """ Prepare input data and initiate notification sending """ self._logger.info("Notification process started") self.api.send() self._logger.info("Notification process finished") Plugin().notify()