checkmk-plugins/check_mk-azure/local/lib/check_mk/base/plugins/agent_based/azure_common.py

279 lines
7.0 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2024 Spearhead Systems SRL
import json
from datetime import datetime, timezone
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State, Metric
def check_state_below(alert_percentages, measured_percent):
if alert_percentages:
if alert_percentages[1] >= measured_percent:
return State.CRIT
elif alert_percentages[0] >= measured_percent:
return State.WARN
return State.OK
def check_state_above(alert_percentages, measured_percent):
if alert_percentages:
if alert_percentages[1] <= measured_percent:
return State.CRIT
elif alert_percentages[0] <= measured_percent:
return State.WARN
return State.OK
# Convert JSON entries into dictionaries indexed by name. We're assuming here
# that the name is unique across AZs and resource groups. If not, add the
# 'location' and 'resource_group' fields in each object to the name.
def parse(string_table):
lookup = {}
for json_data in string_table:
obj = json.loads(json_data[0])
name = obj["name"]
group = obj["resource_group"]
lookup[f"{name}#{group}"] = obj
return lookup
# Produce a list of Azure objects for discovery.
def discover(section):
for name, details in sorted(section.items()):
yield Service(item=name)
# Given a specific keyvault metric, look it up in the parsed output, and produce
# results on that service based upon the metric's range.
def check_keyvault(item, params, section):
vault = section.get(item)
if vault is None:
return
metrics = vault["metrics"]
availability = metrics.get("Availability")
capacity = metrics.get("SaturationShoebox")
latency = metrics.get("ServiceApiLatency")
hits = metrics.get("ServiceApiHit")
results = metrics.get("ServiceApiResult")
alert_availability_percent = params.get("availability")
alert_capacity_percent = params.get("capacity")
alert_latency_milliseconds = params.get("latency")
if availability is not None:
yield Result(
state=check_state_below(alert_availability_percent, availability),
summary=f"Availability: {availability}%",
)
yield Metric(
name="availability",
value=availability,
boundaries=(0, 100),
)
else:
yield Result(
state=State.UNKNOWN,
summary="Availability: N/A",
)
if capacity is not None:
yield Result(
state=check_state_above(alert_capacity_percent, capacity),
summary=f"Capacity: {capacity}%"
)
yield Metric(
name="capacity",
value=capacity,
boundaries=(0, 100),
)
else:
yield Result(
state=State.UNKNOWN,
summary="Capacity: N/A",
)
if latency is not None:
yield Result(
state=check_state_above(alert_latency_milliseconds, latency),
summary=f"Latency: {latency}ms",
)
yield Metric(
name="latency",
value=latency,
boundaries=(0, None),
)
else:
yield Result(
state=State.UNKNOWN,
summary="Latency: N/A",
)
if hits is not None:
yield Metric(
name="hits",
value=hits,
boundaries=(0, None),
)
else:
yield Result(
state=State.UNKNOWN,
summary="Hits: N/A",
)
if results is not None:
yield Metric(
name="results",
value=results,
boundaries=(0, None),
)
else:
yield Result(
state=State.UNKNOWN,
summary="Results: N/A",
)
# Given a specific firewall metric, look it up in the parsed output, and produce
# results on that service based upon the metric's range.
def check_firewall(item, params, section):
firewall = section.get(item)
if firewall is None:
return
metrics = firewall["metrics"]
availability = metrics.get("FirewallHealth")
throughput = metrics.get("Throughput")
latency = metrics.get("FirewallLatencyPng")
alert_availability_percent = params.get("availability")
alert_latency_milliseconds = params.get("latency")
if availability is not None:
yield Result(
state=check_state_below(alert_availability_percent, availability),
summary=f"Availability: {availability}%",
)
yield Metric(
name="availability",
value=availability,
boundaries=(0, 100)
)
else:
yield Result(
state=State.UNKNOWN,
summary="Availability: N/A",
)
if latency is not None:
yield Result(
state=check_state_above(alert_latency_milliseconds, latency),
summary=f"Latency: {latency}ms",
)
yield Metric(
name="latency",
value=latency,
boundaries=(0, None)
)
else:
yield Result(
state=State.UNKNOWN,
summary="Latency: N/A",
)
if throughput is not None:
yield Metric(
name="throughput",
value=throughput,
boundaries=(0, None)
)
else:
yield Result(
state=State.UNKNOWN,
summary="Throughput: N/A",
)
def check_defender(item, params, section):
alert = section.get(item)
if alert is None:
return
details = alert["alert"]
status = details["status"]
if status != "Active" and status != "InProgress":
return
severity = details["severity"]
url = details["url"]
info = details["info"]
if severity == "High":
state = State.CRIT
elif severity == "Medium":
state = State.WARN
elif severity == "Low":
state = State.OK
else:
state = State.UNKNOWN
yield Result(
state=state,
summary=f"{status}: {info}: {url}"
)
register.agent_section(
name="azure_keyvault",
parse_function=parse
)
register.check_plugin(
name="azure_keyvault",
service_name="Azure Keyvault Metric %s",
check_function=check_keyvault,
check_default_parameters={},
check_ruleset_name="azure_keyvault",
discovery_function=discover,
)
register.agent_section(
name="azure_firewall",
parse_function=parse
)
register.check_plugin(
name="azure_firewall",
service_name="Azure Firewall Metric %s",
check_function=check_firewall,
check_default_parameters={},
check_ruleset_name="azure_firewall",
discovery_function=discover,
)
register.agent_section(
name="azure_defender",
parse_function=parse
)
register.check_plugin(
name="azure_defender",
service_name="Azure Defender Alert %s",
check_function=check_defender,
check_default_parameters={},
check_ruleset_name="azure_defender",
discovery_function=discover,
)