Add Triton Wedge detector.

This commit is contained in:
Marsell Kukuljevic 2025-04-01 20:07:06 +02:00
parent 1b4fafb15e
commit 0833ae7a16
5 changed files with 278 additions and 0 deletions
check_mk-wedge
local
lib/check_mk/base/plugins/agent_based
share/check_mk
agents/special
checks
web/plugins/wato
triton_wedge-0.1.0.mkp

@ -0,0 +1,64 @@
#!/usr/bin/env python3
#
# Parses and checks external VM IPs.
import json
from cmk.base.plugins.agent_based.agent_based_api.v1 import register, Result, Service, State
def parse_triton_wedge(string_table):
lookup = {}
for row in string_table:
nic = json.loads(row[0])
cn_name = nic["cn"]
vms_in_cn = lookup.setdefault(cn_name, [])
vms_in_cn.append(nic)
return lookup
register.agent_section(
name="triton_wedge",
parse_function=parse_triton_wedge
)
def discover_triton_wedge(section):
for cn_name, vms in sorted(section.items()):
yield Service(item=cn_name, parameters={"name": cn_name})
def check_triton_wedge(item, params, section):
cn_name = params["name"]
vms = section.get(cn_name)
if vms is None:
yield Result(state=State.WARN, summary="Not appearing in NAPI")
return
wedged_vms = []
for vm in vms:
if vm["wedged"]:
wedged_vms.append(vm)
if len(wedged_vms) == 0:
yield Result(state=State.OK, summary="No wedge detected")
elif len(wedged_vms) == 1:
vm = wedged_vms[0]
summary = "Potential wedge detected for VM %s (%s)" % (vm["vm"], vm["ip"])
yield Result(state=State.WARN, summary=summary)
else:
lst = ", ".join(map(lambda vm: "VM %s (%s)" % (vm["vm"], vm["ip"]), wedged_vms))
yield Result(state=State.CRIT, summary=f"Likely wedged detected for {lst}")
register.check_plugin(
name="triton_wedge",
service_name="Triton Wedge CN %s",
discovery_function=discover_triton_wedge,
check_function=check_triton_wedge,
check_default_parameters={},
check_ruleset_name="triton_wedge",
)

@ -0,0 +1,165 @@
#!/usr/bin/env python3
# The range of ephemeral local ports we use when attempting to probe remote
# IPs. In the past, wedged ports appeared with a stride of 8; to be safe, we use
# a stride of 128.
PORT_RANGE_START = 57000
PORT_RANGE_END = 57128
CONNECT_RETRIES = 3
CHECK_REMOTE_PORTS = [443, 80]
CONCURRENT_SCANS = 200
import urllib.request, sys, argparse, asyncio, json, socket, errno
def get_url(url):
request = urllib.request.Request(url)
with urllib.request.urlopen(request) as conn:
data = conn.read()
return data
# Fetch and parse details about active zone NICs on the external network.
def query_napi(addr):
url = 'http://%s/nics?nic_tag=external&belongs_to_type=zone&state=running' % addr
try:
json_data = get_url(url)
nics = json.loads(json_data)
return nics
except urllib.error.HTTPError as e:
sys.stderr.write("NAPI error: %s\n" % e)
# asyncio provides some nice connection methods, but none of them allow us to
# use SO_REUSEPORT. This flag is critical since we're repeatedly using the
# same range of local ports to port map remote IPs. So we have to resort to
# this low-level socket hackery to enable SO_REUSEPORT.
async def async_connect(src, dest):
loop = asyncio.get_event_loop()
sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sd.bind(src)
sd.setblocking(False)
connected = False
# We try to connect() several times, in case a packet got lost.
for attempt in range(CONNECT_RETRIES):
try:
future = loop.sock_connect(sd, dest)
await asyncio.wait_for(future, timeout=0.1)
connected = True
break
except ConnectionRefusedError:
# ECONNREFUSED (we received a RST after sending an ACK). If we
# receive this there's no point retrying.
break
except (TimeoutError, asyncio.TimeoutError):
# Usually you'd wait for the TCP stack to make its own retries, but
# we know our target IPs are in a nearby rack, so we don't want to
# wait that long. Ergo we do our own trying, with a fast timeout.
# If we hit here, a packet might have been lost, so try again.
pass
except OSError as e:
if e.errno == errno.EHOSTUNREACH:
# If there is no route, no point retrying either.
break
else:
raise
sd.close()
return connected
# Check for a wedge on a NIC. We detect a wedge by doing the following:
#
# Us (local IP, local port) -----> Them (remote IP, remote port)
#
# 1. Find an open remote port (to speed things up we check ports 443 and 80)
# 2. Repeatedly connect() to the remote port while incrementing our local port
# 3. If we find a local port that fails to connect, this may be a wedge
#
async def check_for_wedge(nic, semaphore):
local_ip = "0.0.0.0"
remote_ip = nic["ip"]
can_connect = False
result = {
"cn": nic["cn_uuid"],
"vm": nic["belongs_to_uuid"],
"ip": nic["ip"],
"wedged": False
}
async with semaphore:
# To speed things up, we only check ports 443 and 80, which are the
# most common ports on the Internet.
for remote_port in CHECK_REMOTE_PORTS:
if can_connect:
break
for local_port in range(PORT_RANGE_START, PORT_RANGE_END):
src = (local_ip, local_port)
dest = (remote_ip, remote_port)
connected = await async_connect(src, dest)
if can_connect and not connected:
result["wedged"] = True
return result
elif connected:
can_connect = True
return result
# Given an array of nics, scan the ports on each nic's IP address, checking if
# any appear to be wedged.
async def scan(nics):
sem = asyncio.Semaphore(CONCURRENT_SCANS)
tasks = map(lambda nic: asyncio.create_task(check_for_wedge(nic, sem)), nics)
done, pending = await asyncio.wait(tasks)
return map(lambda f: f.result(), done)
# Print out all our results in a format that CheckMK understands. Most of our
# output are in JSON rows.
def print_out(scan_results, agent_name):
sys.stdout.write(f"<<<{agent_name}:sep(0)>>>\n")
scan_results = list(scan_results)
scan_results.sort(key=lambda d: d["cn"] + d["vm"] + d["ip"])
for entry in scan_results:
sys.stdout.write("%s\n" % json.dumps(entry))
# Parse the command-line arguments, specifically for hostname. Print out help
# to console if we get no args.
def parse_arguments(argv):
parser = argparse.ArgumentParser()
parser.add_argument(
"hostname", metavar="HOSTNAME", help="Hostname of NAPI to query."
)
return parser.parse_args(argv)
# Parse args, contact NAPI, query external IPs for VMs, and then print results
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
args = parse_arguments(argv)
nics = query_napi(args.hostname)
# Sort the IPs so that (tend) to scan them in relative order. This is to
# increase the time between scans to the same IP due to consecutive agent
# executions, otherwise there's a higher chance we bump into TIME_WAIT.
#nics.sort(key=lambda d: d["ip"])
scan_results = asyncio.run(scan(nics))
print_out(scan_results, "triton_wedge")
if __name__ == "__main__":
sys.exit(main())

@ -0,0 +1,7 @@
#!/usr/bin/env python3
def agent_triton_wedge(params, hostname, ipaddress):
return [params["instance"]]
special_agent_info["triton_wedge"] = agent_triton_wedge

@ -0,0 +1,42 @@
#!/usr/bin/env python3
#
# GUI config page for triton_wedge.
from cmk.gui.i18n import _
from cmk.gui.plugins.wato.utils import (
rulespec_registry,
HostRulespec,
RulespecGroupCheckParametersHardware
)
from cmk.gui.watolib.rulespecs import Rulespec
from cmk.gui.valuespec import (
Dictionary,
Hostname,
)
def _valuespec_special_agents_triton_wedge():
return Dictionary(
title=_("Triton Wedge Detection"),
help=_(""),
elements=[
(
"instance",
Hostname(
title=_("Hostname"),
help=_("Hostname or IP of NAPI to query"),
allow_empty=False,
),
),
],
)
rulespec_registry.register(
HostRulespec(
factory_default=Rulespec.FACTORY_DEFAULT_UNUSED,
name="special_agents:triton_wedge",
group=RulespecGroupCheckParametersHardware,
valuespec=_valuespec_special_agents_triton_wedge,
)
)

Binary file not shown.