Add plugin that performs various checks on domain names.

This commit is contained in:
Marsell Kukuljevic 2025-04-09 18:50:46 +02:00
parent de64c02488
commit a37a8c33b1
9 changed files with 395 additions and 0 deletions

Binary file not shown.

View File

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import datetime
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State
# Incoming agent output takes the form:
#
# google.com:
# 2028-09-14
# yahoo.com:
# 2026-01-19
#
# The domains being queries end with a colon. The expiry response does not.
#
# Return a dictionary which uses domain as key, and returns an expiry as
# datetime.date.
def parse_domains_expiry(string_table):
expiries = {}
current_domain = ""
for line in string_table:
line = line[0]
domain = line[:-1]
ending = line[-1]
if ending == ":":
current_domain = domain
else:
date = datetime.datetime.strptime(line, '%Y-%m-%d').date()
expiries[current_domain] = date
return expiries
def discover_domains_expiry(section):
for domain, date in section.items():
yield Service(item=domain)
def check_domains_expiry(item, params, section):
expiry = section.get(item)
if not expiry:
yield Result(state=State.WARN, summary="Expiry not found in whois")
return
alert_delta = params.get("days_remaining")
if not alert_delta:
yield Result(state=State.WARN, summary="No expiry check rule configured")
return
valid_days_left = (expiry - datetime.date.today()).days
summary = "Domain expires in %s days" % valid_days_left
state = State.OK
if valid_days_left < alert_delta[1]:
state = State.CRIT
elif valid_days_left < alert_delta[0]:
state = State.WARN
yield Result(state=state, summary=summary)
register.agent_section(
name="domains_expiry",
parse_function=parse_domains_expiry
)
register.check_plugin(
name="domains_expiry",
service_name="Expiry for Domain '%s'",
discovery_function=discover_domains_expiry,
check_function=check_domains_expiry,
check_default_parameters={},
check_ruleset_name="domains_expiry",
)

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State
# Incoming agent output takes the form:
#
# google.com:
# ns3.google.com.
# ns1.google.com.
# ns4.google.com.
# ns2.google.com.
# yahoo.com:
# ns2.yahoo.com.
# ns1.yahoo.com.
# ns3.yahoo.com.
# ns4.yahoo.com.
# ns5.yahoo.com.
#
# The domains being queries end with a colon. The nameserver responses end with
# a period. Nameservers follow the domain being queried.
#
# Return a dictionary which uses domain as key, and returns a list of associated
# nameservers for that key.
def parse_domains_nameservers(string_table):
nameservers = {}
current_domain = ""
for line in string_table:
domain = line[0][:-1]
ending = line[0][-1]
if ending == ":":
current_domain = domain
nameservers[current_domain] = []
elif ending == ".":
nameservers[current_domain].append(domain)
return nameservers
def discover_domains_nameservers(section):
for domain, nameservers in section.items():
yield Service(item=domain)
def check_domains_nameservers(item, params, section):
nameservers = section.get(item)
if not nameservers:
yield Result(state=State.WARN, summary="Nameservers missing")
return
configs = params.get("domain_nameservers")
if not configs:
yield Result(state=State.WARN, summary="No nameservers check rule configured")
return
alert_level = params["alert_level"]
nameservers.sort()
domain_found = False
for config in configs:
domains = config["domains"]
if item in domains:
domain_found = True
expected_nameservers = config["nameservers"]
expected_nameservers.sort()
if expected_nameservers == nameservers:
yield Result(state=State.OK, summary="Expected nameservers present")
else:
expected_str = ", ".join(expected_nameservers)
found_str = ", ".join(nameservers)
yield Result(state=State(alert_level), summary="Mismatch in nameservers. Expected: [%s], Found: [%s]" % (expected_str, found_str))
if not domain_found:
yield Result(state=State.WARN, summary="Domain not found in any nameserver check rule")
register.agent_section(
name="domains_nameservers",
parse_function=parse_domains_nameservers
)
register.check_plugin(
name="domains_nameservers",
service_name="Nameservers for Domain '%s'",
discovery_function=discover_domains_nameservers,
check_function=check_domains_nameservers,
check_default_parameters={},
check_ruleset_name="domains_nameservers",
)

View File

@ -0,0 +1,8 @@
#!/bin/bash
# Copyright (C) 2025 Spearhead Systems SRL
echo "<<<domains_expiry:sep(0)>>>"
for domain in "$@"; do
echo "$domain:"
whois "$domain" | grep 'Expir.*' | head -1 | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}'
done

View File

@ -0,0 +1,8 @@
#!/bin/bash
# Copyright (C) 2025 Spearhead Systems SRL
echo "<<<domains_nameservers:sep(0)>>>"
for domain in "$@"; do
echo "$domain:"
dig +short NS "$domain"
done

View File

@ -0,0 +1,7 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
def agent_domains_expiry_args(params, hostname, ipaddress):
return params["domains"]
special_agent_info["domains_expiry"] = agent_domains_expiry_args

View File

@ -0,0 +1,7 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
def agent_domains_nameservers_args(params, hostname, ipaddress):
return params["domains"]
special_agent_info["domains_nameservers"] = agent_domains_nameservers_args

View File

@ -0,0 +1,82 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import copy
from cmk.base.plugins.agent_based.agent_based_api.v1 import State
from cmk.gui.i18n import _
from cmk.gui.plugins.wato.utils import (
rulespec_registry,
HostRulespec,
RulespecGroupCheckParametersNetworking,
)
from cmk.gui.watolib.rulespecs import Rulespec
from cmk.gui.valuespec import (
Dictionary,
Integer,
ListOfStrings,
Tuple,
)
def _valuespec_special_agents_domains_expiry_query():
return Dictionary(
title=_("Domains Expiry Query"),
required_keys=["domains"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names to check"),
allow_empty=False,
),
),
]
)
def _valuespec_special_agents_domains_expiry_checks():
return Dictionary(
title=_("Domains Expiry"),
required_keys=["days_remaining"],
elements=[
(
"days_remaining",
Tuple(
title=_("Days Remaining"),
help=_("If there are fewer days until one of the above domains expires, issue an alert"),
elements=[
Integer(
title=_("Warn if fewer days than"),
minvalue=0,
default_value=30
),
Integer(
title=_("Crit if fewer days than"),
minvalue=0,
default_value=7
)
]
)
),
],
)
rulespec_registry.register(
HostRulespec(
name="special_agents:domains_expiry",
group=RulespecGroupCheckParametersNetworking,
match_type='dict',
valuespec=_valuespec_special_agents_domains_expiry_query,
)
)
rulespec_registry.register(
CheckParameterRulespecWithItem(
check_group_name="domains_expiry",
group=RulespecGroupCheckParametersNetworking,
match_type="dict",
parameter_valuespec=_valuespec_special_agents_domains_expiry_checks,
item_spec=lambda: TextInput(title=_("Expiry")),
title=lambda: _("Domains Expiry Checks"),
)
)

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
# Copyright (C) 2025 Spearhead Systems SRL
import copy
from cmk.base.plugins.agent_based.agent_based_api.v1 import State
from cmk.gui.i18n import _
from cmk.gui.plugins.wato.utils import (
rulespec_registry,
HostRulespec,
RulespecGroupCheckParametersNetworking,
)
from cmk.gui.watolib.rulespecs import Rulespec
from cmk.gui.valuespec import (
Dictionary,
Integer,
ListOfStrings,
DropdownChoice,
Tuple,
ListOf,
)
def _valuespec_special_agents_domains_nameservers_query():
return Dictionary(
title=_("Domains Nameservers Query"),
required_keys=["domains"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names to check"),
allow_empty=False,
),
),
]
)
def _valuespec_special_agents_domains_nameservers_checks():
return Dictionary(
title=_("Domains Nameservers Checks"),
required_keys=["domain_nameservers", "alert_level"],
elements=[
(
"domain_nameservers",
ListOf(
valuespec=Dictionary(
required_keys=["domains", "nameservers"],
elements=[
(
"domains",
ListOfStrings(
title=_("Domain names"),
help=_("List of domain names the below nameservers apply to"),
allow_empty=False,
)
),
(
"nameservers",
ListOfStrings(
title=_("Nameservers"),
help=_("List of nameservers that all of the above domain names should have"),
allow_empty=False,
)
),
]
)
)
),
(
"alert_level",
DropdownChoice(
title=_("Alert level used on mismatch"),
help=_("Alert level used when there is a mismatch in domain name servers for a domain"),
default_value=State.WARN.value,
choices=[
(State.CRIT.value, _(State.CRIT.name)),
(State.WARN.value, _(State.WARN.name)),
(State.OK.value, _(State.OK.name)),
],
),
),
],
)
rulespec_registry.register(
HostRulespec(
name="special_agents:domains_nameservers",
group=RulespecGroupCheckParametersNetworking,
match_type='dict',
valuespec=_valuespec_special_agents_domains_nameservers_query,
)
)
rulespec_registry.register(
CheckParameterRulespecWithItem(
check_group_name="domains_nameservers",
group=RulespecGroupCheckParametersNetworking,
match_type="dict",
parameter_valuespec=_valuespec_special_agents_domains_nameservers_checks,
item_spec=lambda: TextInput(title=_("Nameserver")),
title=lambda: _("Domains Nameservers Checks"),
)
)