SCO-241: Update domain_checks plugin to use 2.3.0 CheckMK plugin API (since 2.4.0 requires this API). Also merge discovery and check GUIs into single pages.

This commit is contained in:
Marsell Kukuljevic 2025-05-12 21:15:08 +02:00
parent 475fde79c1
commit bbac7b11ca
18 changed files with 404 additions and 399 deletions

Binary file not shown.

BIN
domains/domain_checks-0.2.0.mkp Executable file

Binary file not shown.

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import datetime
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State
# Incoming agent output takes the form:
#
# google.com:
# 2028-09-14
# yahoo.com:
# 2026-01-19
#
# The domains being queries end with a colon. The expiry response does not.
#
# Return a dictionary which uses domain as key, and returns an expiry as
# datetime.date.
def parse_domains_expiry(string_table):
expiries = {}
current_domain = ""
for line in string_table:
line = line[0]
domain = line[:-1]
ending = line[-1]
if ending == ":":
current_domain = domain
else:
date = datetime.datetime.strptime(line, '%Y-%m-%d').date()
expiries[current_domain] = date
return expiries
def discover_domains_expiry(section):
for domain, date in section.items():
yield Service(item=domain)
def check_domains_expiry(item, params, section):
expiry = section.get(item)
if not expiry:
yield Result(state=State.WARN, summary="Expiry not found in whois")
return
alert_delta = params.get("days_remaining")
if not alert_delta:
yield Result(state=State.WARN, summary="No expiry check rule configured")
return
valid_days_left = (expiry - datetime.date.today()).days
summary = "Domain expires in %s days" % valid_days_left
state = State.OK
if valid_days_left < alert_delta[1]:
state = State.CRIT
elif valid_days_left < alert_delta[0]:
state = State.WARN
yield Result(state=state, summary=summary)
register.agent_section(
name="domains_expiry",
parse_function=parse_domains_expiry
)
register.check_plugin(
name="domains_expiry",
service_name="Expiry for Domain '%s'",
discovery_function=discover_domains_expiry,
check_function=check_domains_expiry,
check_default_parameters={},
check_ruleset_name="domains_expiry",
)

View File

@ -1,100 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State
# Incoming agent output takes the form:
#
# google.com:
# ns3.google.com.
# ns1.google.com.
# ns4.google.com.
# ns2.google.com.
# yahoo.com:
# ns2.yahoo.com.
# ns1.yahoo.com.
# ns3.yahoo.com.
# ns4.yahoo.com.
# ns5.yahoo.com.
#
# The domains being queries end with a colon. The nameserver responses end with
# a period. Nameservers follow the domain being queried.
#
# Return a dictionary which uses domain as key, and returns a list of associated
# nameservers for that key.
def parse_domains_nameservers(string_table):
nameservers = {}
current_domain = ""
for line in string_table:
domain = line[0][:-1]
ending = line[0][-1]
if ending == ":":
current_domain = domain
nameservers[current_domain] = []
elif ending == ".":
nameservers[current_domain].append(domain)
return nameservers
def discover_domains_nameservers(section):
for domain, nameservers in section.items():
yield Service(item=domain)
def check_domains_nameservers(item, params, section):
nameservers = section.get(item)
if not nameservers:
yield Result(state=State.WARN, summary="Nameservers missing")
return
configs = params.get("domain_nameservers")
if not configs:
yield Result(state=State.WARN, summary="No nameservers check rule configured")
return
alert_level = params["alert_level"]
nameservers.sort()
domain_found = False
for config in configs:
domains = config["domains"]
if item in domains:
domain_found = True
expected_nameservers = config["nameservers"]
expected_nameservers.sort()
if expected_nameservers == nameservers:
yield Result(state=State.OK, summary="Expected nameservers present")
else:
expected_str = ", ".join(expected_nameservers)
found_str = ", ".join(nameservers)
yield Result(state=State(alert_level), summary="Mismatch in nameservers. Expected: [%s], Found: [%s]" % (expected_str, found_str))
if not domain_found:
yield Result(state=State.WARN, summary="Domain not found in any nameserver check rule")
register.agent_section(
name="domains_nameservers",
parse_function=parse_domains_nameservers
)
register.check_plugin(
name="domains_nameservers",
service_name="Nameservers for Domain '%s'",
discovery_function=discover_domains_nameservers,
check_function=check_domains_nameservers,
check_default_parameters={},
check_ruleset_name="domains_nameservers",
)

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import datetime
import json
from cmk.agent_based.v2 import Result, Service, State, CheckPlugin, AgentSection
# Incoming agent output is JSON of the form:
#
# {"domain": "google.com", "state": "CRIT", "expires": "2028-09-14"}
#
# "domain" and "state" are always present, whereas "expires" is optional
# (e.g. most ccTLDs do not return expiry information). The "state" is one of
# (OK, WARN, CRIT, UNKNOWN).
#
# Return a dictionary which uses domain as key, and the rest of the dictionary
# as a value.
def parse_domains_expiry(string_table):
results = {}
for [line] in string_table:
result = json.loads(line)
results[result["domain"]] = result
return results
def discover_domains_expiry(section):
for domain in section.keys():
yield Service(item=domain)
def check_domains_expiry(item, params, section):
data = section.get(item)
if not data:
yield Result(state=State.WARN, summary="Expiry not found in whois")
return
state = State[data["state"]]
expiry = data.get("expires")
if expiry:
summary = f"Expires on {expiry}"
else:
summary = "No expiry found"
yield Result(state=state, summary=summary)
agent_section_domains_expiry = AgentSection(
name="domains_expiry",
parse_function=parse_domains_expiry
)
check_plugin_domains_expiry = CheckPlugin(
name="domains_expiry",
service_name="Expiry for Domain '%s'",
discovery_function=discover_domains_expiry,
check_function=check_domains_expiry,
check_default_parameters={},
)

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import json
from cmk.agent_based.v2 import Result, Service, State, CheckPlugin, AgentSection
# Incoming agent output is JSON of the form:
#
# {"domain": "yahoo.com", "state": "WARN", "unexpected": ["ns5.yahoo.com"], "missing": ["ns6.yahoo.com"]}
#
# Each JSON row is for a single domain. Each row always has "domain" and "state"
# fields, with optional "unexpected" and "missing" fields. The "state" field is
# one of "OK", "WARN", "CRIT", or "UNKNOWN". The "unexpected" field is a list of
# unwanted nameservers that are present in the dig query result, whereas the
# "missing" field are wanted nameservers that are not present.
#
# Return a dictionary which uses domain as key, and a dictionary of
# state/unexpected/missing.
def parse_domains_nameservers(string_table):
results = {}
for [line] in string_table:
result = json.loads(line)
results[result["domain"]] = result
return results
def discover_domains_nameservers(section):
for domain in section.keys():
yield Service(item=domain)
def check_domains_nameservers(item, params, section):
data = section.get(item)
if not data:
yield Result(state=State.WARN, summary="Nameservers missing")
return
state = State[data["state"]]
unexpected = data.get("unexpected")
missing = data.get("missing")
summary = ""
if unexpected:
summary += f"Unexpected nameserver(s) found: {''.join(unexpected)}. "
if missing:
summary += f"Expected nameserver(s) missing: {''.join(missing)}. "
if not summary:
summary = "All expected nameservers found, none unexpected found"
yield Result(state=state, summary=summary)
agent_section_triton_wedge = AgentSection(
name="domains_nameservers",
parse_function=parse_domains_nameservers
)
check_plugin_domains_nameservers = CheckPlugin(
name="domains_nameservers",
service_name="Nameservers for Domain '%s'",
discovery_function=discover_domains_nameservers,
check_function=check_domains_nameservers,
check_default_parameters={},
)

View File

@ -0,0 +1,42 @@
#!/bin/bash
# Copyright (C) 2025 Spearhead Systems SRL
set -eu
if [[ $# < 3 ]]; then
echo "Usage: ${@: 0:1} <domains> <crit date> <warn date>" 1>&2
echo "Example: ${@: 0:1} google.com yahoo.com 2024-05-07 2024-05-21" 1>&2
exit 1
fi
# Extract from args
domains="${@: 1:$#-2}"
warn="${@: -1:1}"
crit="${@: -2:1}"
echo "<<<domains_expiry:sep(0)>>>"
for domain in $domains; do
echo -n "{\"domain\": \"$domain\", \"state\": \""
# Unfortunately, there's no actual format for whois entries, so this is a
# best-effort based on things seen in the wild. Note that ccTLDs usually
# do not publish expiry dates at all.
expires=$(whois "$domain" | grep 'Expir.*' | head -1 | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}' || true)
if [[ "$expires" == "" ]]; then
echo -n "UNKNOWN"
elif [[ "$expires" < "$crit" ]]; then
echo -n "CRIT"
elif [[ "$expires" < "$warn" ]]; then
echo -n "WARN"
else
echo -n "OK"
fi
if [[ "$expires" == "" ]]; then
echo "\"}"
else
echo "\", \"expires\": \"$expires\"}"
fi
done

View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import json
import optparse
import subprocess
print("<<<domains_nameservers:sep(0)>>>")
parser = optparse.OptionParser()
parser.add_option('-d', '--domains', action="append")
parser.add_option('-n', '--nameservers', action="append")
parser.add_option('-a', '--alert', default="WARN")
opts, _ = parser.parse_args()
alert = opts.alert
assert ["OK", "WARN", "CRIT", "UNKNOWN"].count(alert)
for domain_str, nameserver_str in zip(opts.domains, opts.nameservers):
domains = domain_str.split(",")
nameservers = set(nameserver_str.split(","))
for domain in domains:
dig_result = subprocess.run(["dig", "+short", "NS", domain], capture_output=True)
dig_ns = set(dig_result.stdout.decode("utf-8").split('.\n'))
dig_ns.remove('')
result = {
"domain": domain,
"state": "OK",
}
if dig_ns != nameservers:
result["state"] = alert
unexpected = list(dig_ns - nameservers)
missing = list(nameservers - dig_ns)
if unexpected:
result["unexpected"] = unexpected
if missing:
result["missing"] = missing
print(json.dumps(result))

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from cmk.rulesets.v1.form_specs.validators import LengthInRange, NumberInRange
from cmk.rulesets.v1.form_specs import Dictionary, DictElement, List, String, Integer, DefaultValue
from cmk.rulesets.v1.rule_specs import SpecialAgent, Topic, Title, Help
def _formspec():
return Dictionary(
title=Title("Domains Expiry"),
elements={
"domains": DictElement(
required=True,
parameter_form=List(
title=Title("Domain names"),
help_text=Help("List of domain names to check"),
editable_order=False,
custom_validate=(LengthInRange(min_value=1),),
element_template=String(
custom_validate=(LengthInRange(min_value=3),),
),
),
),
"days_warn": DictElement(
required=True,
parameter_form=Integer(
title=Title("Warn if expires within days"),
help_text=Help("If there are fewer days until one of the above domains expires, issue an alert"),
custom_validate=(NumberInRange(min_value=0),),
prefill=DefaultValue(30),
),
),
"days_crit": DictElement(
required=True,
parameter_form=Integer(
title=Title("Crit if expires within days"),
help_text=Help("If there are fewer days until one of the above domains expires, issue an alert"),
custom_validate=(NumberInRange(min_value=0),),
prefill=DefaultValue(7),
),
),
}
)
rule_spec_agent_config_domains_expiry = SpecialAgent(
topic=Topic.NETWORKING,
name="domains_expiry",
title=Title("Domains Expiry"),
parameter_form=_formspec,
)

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from cmk.rulesets.v1.form_specs.validators import LengthInRange
from cmk.rulesets.v1.form_specs import Dictionary, DictElement, SingleChoice, SingleChoiceElement, List, String, DefaultValue
from cmk.rulesets.v1.rule_specs import SpecialAgent, Topic, Title, Help
from cmk.agent_based.v2 import State
def _formspec():
return Dictionary(
title=Title("Domains Nameservers"),
elements={
"domain_nameservers": DictElement(
required=True,
parameter_form=List(
title=Title("Domains to check"),
help_text=Help("List of domain names to check"),
editable_order=False,
custom_validate=(LengthInRange(min_value=1),),
element_template=Dictionary(
elements={
"domains": DictElement(
required=True,
parameter_form=List(
title=Title("Domain names"),
help_text=Help("List of domain names the below nameservers apply to"),
custom_validate=(LengthInRange(min_value=1),),
element_template=String(
custom_validate=(LengthInRange(min_value=3),),
),
),
),
"nameservers": DictElement(
required=True,
parameter_form=List(
title=Title("Nameservers"),
help_text=Help("List of nameservers that the above domain names should have"),
custom_validate=(LengthInRange(min_value=1),),
element_template=String(
custom_validate=(LengthInRange(min_value=3),),
),
),
),
}
)
),
),
"alert_level": DictElement(
required=True,
parameter_form=SingleChoice(
title=Title("Alert level used on mismatch"),
help_text=Help("Alert level used when there is a mismatch in domain name servers for a domain"),
prefill=DefaultValue(State.WARN.name),
elements=[
SingleChoiceElement(name=State.CRIT.name, title=Title(State.CRIT.name)),
SingleChoiceElement(name=State.WARN.name, title=Title(State.WARN.name)),
SingleChoiceElement(name=State.OK.name, title=Title(State.OK.name)),
],
),
),
},
)
rule_spec_agent_config_domains_nameservers = SpecialAgent(
topic=Topic.NETWORKING,
name="domains_nameservers",
title=Title("Domains Nameservers"),
parameter_form=_formspec,
)

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from datetime import datetime, timedelta
from cmk.server_side_calls.v1 import noop_parser, SpecialAgentConfig, SpecialAgentCommand
def _agent_arguments(params, host_config):
today = datetime.today()
args = params["domains"]
for key in ["days_crit", "days_warn"]:
date = (today + timedelta(days=params[key])).date()
args.append(str(date))
yield SpecialAgentCommand(command_arguments=args)
special_agent_domains_expiry = SpecialAgentConfig(
name="domains_expiry",
parameter_parser=noop_parser,
commands_function=_agent_arguments,
)

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from cmk.server_side_calls.v1 import noop_parser, SpecialAgentConfig, SpecialAgentCommand
# Although the API makes it seem like we can yield several
# SpecialAgentCommands, the use of a table for _elems in
# lib/python3/cmk/base/sources/_builder.py (at least in 2.3.0p31) means that
# we can only emit one SpecialAgentCommands. This CheckMK bug means that
# SpecialAgentCommands() will be forgotten.
#
# Alas, that means we need to pack everything into a single command invocation.
def _agent_arguments(params, host_config):
args = []
alert_level = params["alert_level"]
for ele in params["domain_nameservers"]:
domains = ele["domains"]
nameservers = ele["nameservers"]
args.append("--domains=" + ",".join(domains))
args.append("--nameservers=" + ",".join(nameservers))
args.append("--alert=" + alert_level)
yield SpecialAgentCommand(command_arguments=args)
special_agent_domains_nameservers = SpecialAgentConfig(
name="domains_nameservers",
parameter_parser=noop_parser,
commands_function=_agent_arguments,
)

View File

@ -1,8 +0,0 @@
#!/bin/bash
# Copyright (C) 2025 Spearhead Systems SRL
echo "<<<domains_expiry:sep(0)>>>"
for domain in "$@"; do
echo "$domain:"
whois "$domain" | grep 'Expir.*' | head -1 | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}'
done

View File

@ -1,8 +0,0 @@
#!/bin/bash
# Copyright (C) 2025 Spearhead Systems SRL
echo "<<<domains_nameservers:sep(0)>>>"
for domain in "$@"; do
echo "$domain:"
dig +short NS "$domain"
done

View File

@ -1,7 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
def agent_domains_expiry_args(params, hostname, ipaddress):
return params["domains"]
special_agent_info["domains_expiry"] = agent_domains_expiry_args

View File

@ -1,7 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
def agent_domains_nameservers_args(params, hostname, ipaddress):
return params["domains"]
special_agent_info["domains_nameservers"] = agent_domains_nameservers_args

View File

@ -1,84 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import copy
from cmk.base.plugins.agent_based.agent_based_api.v1 import State
from cmk.gui.i18n import _
from cmk.gui.plugins.wato.utils import (
rulespec_registry,
HostRulespec,
RulespecGroupCheckParametersNetworking,
CheckParameterRulespecWithItem
)
from cmk.gui.watolib.rulespecs import Rulespec
from cmk.gui.valuespec import (
Dictionary,
Integer,
ListOfStrings,
Tuple,
TextInput
)
def _valuespec_special_agents_domains_expiry_query():
return Dictionary(
title=_("Domains Expiry Query"),
required_keys=["domains"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names to check"),
allow_empty=False,
),
),
]
)
def _valuespec_special_agents_domains_expiry_checks():
return Dictionary(
title=_("Domains Expiry"),
required_keys=["days_remaining"],
elements=[
(
"days_remaining",
Tuple(
title=_("Days Remaining"),
help=_("If there are fewer days until one of the above domains expires, issue an alert"),
elements=[
Integer(
title=_("Warn if fewer days than"),
minvalue=0,
default_value=30
),
Integer(
title=_("Crit if fewer days than"),
minvalue=0,
default_value=7
)
]
)
),
],
)
rulespec_registry.register(
HostRulespec(
name="special_agents:domains_expiry",
group=RulespecGroupCheckParametersNetworking,
match_type='dict',
valuespec=_valuespec_special_agents_domains_expiry_query,
)
)
rulespec_registry.register(
CheckParameterRulespecWithItem(
check_group_name="domains_expiry",
group=RulespecGroupCheckParametersNetworking,
match_type="dict",
parameter_valuespec=_valuespec_special_agents_domains_expiry_checks,
item_spec=lambda: TextInput(title=_("Expiry")),
title=lambda: _("Domains Expiry Checks"),
)
)

View File

@ -1,105 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import copy
from cmk.base.plugins.agent_based.agent_based_api.v1 import State
from cmk.gui.i18n import _
from cmk.gui.plugins.wato.utils import (
rulespec_registry,
HostRulespec,
CheckParameterRulespecWithItem,
RulespecGroupCheckParametersNetworking,
)
from cmk.gui.watolib.rulespecs import Rulespec
from cmk.gui.valuespec import (
Dictionary,
Integer,
ListOfStrings,
DropdownChoice,
Tuple,
ListOf,
TextInput
)
def _valuespec_special_agents_domains_nameservers_query():
return Dictionary(
title=_("Domains Nameservers Query"),
required_keys=["domains"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names to check"),
allow_empty=False,
),
),
]
)
def _valuespec_special_agents_domains_nameservers_checks():
return Dictionary(
title=_("Domains Nameservers Checks"),
required_keys=["domain_nameservers", "alert_level"],
elements=[
(
"domain_nameservers",
ListOf(
valuespec=Dictionary(
required_keys=["domains", "nameservers"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names the below nameservers apply to"),
allow_empty=False,
)
),
(
"nameservers",
ListOfStrings(
title=_("Nameservers"),
help=_("List of nameservers that all of the above domain names should have"),
allow_empty=False,
)
),
]
)
)
),
(
"alert_level",
DropdownChoice(
title=_("Alert level used on mismatch"),
help=_("Alert level used when there is a mismatch in domain name servers for a domain"),
default_value=State.WARN.value,
choices=[
(State.CRIT.value, _(State.CRIT.name)),
(State.WARN.value, _(State.WARN.name)),
(State.OK.value, _(State.OK.name)),
],
),
),
],
)
rulespec_registry.register(
HostRulespec(
name="special_agents:domains_nameservers",
group=RulespecGroupCheckParametersNetworking,
match_type='dict',
valuespec=_valuespec_special_agents_domains_nameservers_query,
)
)
rulespec_registry.register(
CheckParameterRulespecWithItem(
check_group_name="domains_nameservers",
group=RulespecGroupCheckParametersNetworking,
match_type="dict",
parameter_valuespec=_valuespec_special_agents_domains_nameservers_checks,
item_spec=lambda: TextInput(title=_("Nameserver")),
title=lambda: _("Domains Nameservers Checks"),
)
)