#!/usr/bin/env python3 # Copyright (C) 2023 Spearhead Systems SRL - License: GNU General Public License v2 import json from datetime import datetime, timezone from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State # Convert JSON entries into dictionaries indexed by certificate name. def parse_keyvault(string_table): raw_json = "" cert_data = [] for row in string_table: line = row[0] raw_json += line if line == "]": cert_data.extend(json.loads(raw_json)) raw_json = "" lookup = {} for cert in cert_data: lookup[cert["name"]] = cert return lookup register.agent_section( name="azure_keyvault", parse_function=parse_keyvault ) # Produce a list of certificates based on the parsed output. def discover_keyvault(section): for name, details in sorted(section.items()): yield Service(item=name) # Given a specific certificate, look it up in the parsed output, and produce # results on that service based upon the certificate's expiry. def check_keyvault(item, params, section): warn_days = params.get("warn_days") crit_days = params.get("crit_days") cert = section.get(item) if cert is None: return expires = datetime.fromisoformat(cert["attributes"]["expires"]) now = datetime.now(timezone.utc) remaining_days = (expires - now).days state = State.OK if crit_days is not None and remaining_days < crit_days: state = State.CRIT elif warn_days is not None and remaining_days < warn_days: state = State.WARN yield Result(state=state, summary="Expires in %d days" % remaining_days) register.check_plugin( name="azure_keyvault", service_name="Azure Keyvault Certificate %s", check_function=check_keyvault, check_default_parameters={}, check_ruleset_name="azure_keyvault", discovery_function=discover_keyvault, )