Allow Graylog metrics plugin to use its own custom EWMA windows.

This commit is contained in:
Marsell Kukuljevic 2026-03-06 09:31:02 +01:00
parent 8cd662e2bc
commit 0c882e643c
5 changed files with 148 additions and 38 deletions

Binary file not shown.

View File

@ -15,6 +15,8 @@
# "rs_m15_rate": 163.80530659055356}}
import json
import time
import math
from cmk.agent_based.v2 import (
Result,
Service,
@ -23,11 +25,10 @@ from cmk.agent_based.v2 import (
CheckPlugin,
AgentSection,
render,
get_value_store,
)
def parse_graylog_input_metrics(section):
if not section:
return {}
@ -48,15 +49,91 @@ def render_msgs(num_msgs):
check_configs = [
("im_m1_rate", render_msgs, "Incoming messages/sec 1m"),
("im_m5_rate", render_msgs, "5m"),
("im_m15_rate", render_msgs, "15m"),
("rs_m1_rate", render.bytes, "Incoming bytes/sec 1m"),
("rs_m5_rate", render.bytes, "5m"),
("rs_m15_rate", render.bytes, "15m"),
("im", "small", render_msgs, "Incoming msgs/sec small window"),
("im", "medium", render_msgs, "Incoming msgs/sec medium window"),
("im", "large", render_msgs, "Incoming msgs/sec large window"),
("rs", "small", render.bytes, "Incoming bytes/sec small window"),
("rs", "medium", render.bytes, "Incoming bytes/sec medium window"),
("rs", "large", render.bytes, "Incoming bytes/sec large window"),
]
# Since Graylog only provides 1, 5 and 15 minute windows, when we're attempting
# to (semi-)reconstruct raw values we choose the best Graylog window to use.
# It's a balance between the delay and dilution in each window. We're trying
# to pick the Graylog window with the strongest signal.
def determine_metric_input(store, window_length):
previous_timestamp = store.get("timestamp")
if previous_timestamp is not None:
barrier = max(window_length, time.time() - previous_timestamp)
else:
barrier = window_length
if window_length < 2.5:
return 1
elif window_length < 10:
return 5
else:
return 15
# We take two window lengths: the window length the user specificed in the
# check rule (which can be an arbitrary length), and Graylog's native window
# (which is 1, 5 or 15 minutes). We then attempt to pull out the most recent
# value by effectively reversing the exponentially-weighted moving average
# (EMWA) window math on the Graylog window, and then creating our own EWMA
# value on our own window size.
#
# The rest of the logic is there for the various gotchas that come with these
# two transforms.
def calculate_ewma(store, window_name, window_length, graylog_window_length, newest_value):
now = time.time()
previous_timestamp = store.get("timestamp")
previous_ewma = store.get(f"{window_name}_ewma")
previous_graylog = store.get(f"{window_name}_graylog")
# We need to store both the time delta and the prior EWMA values for both
# Graylog and our own window so the math can work -- we're adding new
# values to a moving *average* after all.
store["timestamp"] = now
store[f"{window_name}_graylog"] = newest_value
if previous_ewma is None:
# Provide a seed on first run to speed up convergence.
store[f"{window_name}_ewma"] = newest_value
return newest_value
if previous_timestamp is None or previous_graylog is None:
return newest_value
# Since both our and Graylog's windows are both in minutes, not seconds,
# the delta is in minutes too.
time_delta = (now - previous_timestamp) / 60.0
# Reverse Graylog's EWMA
raw_alpha = math.exp(- time_delta / graylog_window_length)
raw_value = (newest_value - raw_alpha * previous_graylog) / (1 - raw_alpha)
# Since the above transform magnifies noise, we need to clamp here since
# the noise can cause us to drop below zero.
if raw_value < 0:
raw_value = 0
# Create our own EWMA
ewma_alpha = math.exp(- time_delta / window_length)
ewma = ewma_alpha * previous_ewma + (1 - ewma_alpha) * raw_value
store[f"{window_name}_ewma"] = ewma
# Another clamp, although probably not necessary.
if ewma >= 0:
return ewma
else:
return 0
def check_graylog_input_metrics(item, params, section):
item_id = item.split()[-1][1:-1]
input_info = section.get(item_id)
@ -75,16 +152,35 @@ def check_graylog_input_metrics(item, params, section):
if input_info["input_port"]:
yield Result(state=State.OK, summary="Port: %s" % input_info["input_port"])
for metric_name, render_func, label in check_configs:
value = input_info.get(metric_name)
store = get_value_store()
for prefix, window_name, render_func, label in check_configs:
metric_name = f"{prefix}_{window_name}_rate"
config = params.get(metric_name, {})
window_length = config.get("window")
levels_upper = config.get("upper")
levels_lower = config.get("lower")
if window_length is None:
continue
graylog_window_length = determine_metric_input(store, window_length)
metric_input = f"{prefix}_m{graylog_window_length}_rate"
value = input_info.get(metric_input)
if value is None:
continue
levels_upper = params.get(metric_name, {}).get("upper")
levels_lower = params.get(metric_name, {}).get("lower")
# Since Graylog natively gives us 1m, 5m, 15m, there's no need to
# crunch math for them; we just pass those values through.
if window_length in [1, 5, 15]:
ewma_value = value
else:
ewma_value = calculate_ewma(store, metric_name, window_length, graylog_window_length, value)
yield from check_levels(
value,
ewma_value,
levels_upper = levels_upper,
levels_lower = levels_lower,
metric_name = metric_name,

View File

@ -7,40 +7,40 @@ UNIT_BYTES = metrics.Unit(metrics.IECNotation("bytes/sec"))
UNIT_MSGS = metrics.Unit(metrics.IECNotation("msgs/sec"))
metric_graylog_input_metrics_im_m1_rate = metrics.Metric(
title = Title("Incoming messages/sec (1 min)"),
name = "im_m1_rate",
metric_graylog_input_metrics_im_small_rate = metrics.Metric(
title = Title("Incoming messages/sec (small window)"),
name = "im_small_rate",
unit = UNIT_BYTES,
color = metrics.Color.LIGHT_GREEN,
)
metric_graylog_input_metrics_im_m5_rate = metrics.Metric(
title = Title("Incoming messages/sec (5 min)"),
name = "im_m5_rate",
metric_graylog_input_metrics_im_medium_rate = metrics.Metric(
title = Title("Incoming messages/sec (medium window)"),
name = "im_medium_rate",
unit = UNIT_BYTES,
color = metrics.Color.GREEN,
)
metric_graylog_input_metrics_im_m15_rate = metrics.Metric(
title = Title("Incoming messages/sec (15 min)"),
name = "im_m15_rate",
metric_graylog_input_metrics_im_large_rate = metrics.Metric(
title = Title("Incoming messages/sec (large window)"),
name = "im_large_rate",
unit = UNIT_BYTES,
color = metrics.Color.DARK_GREEN,
)
metric_graylog_input_metrics_rs_m1_rate = metrics.Metric(
title = Title("Incoming bytes/sec (1 min)"),
name = "rs_m1_rate",
metric_graylog_input_metrics_rs_small_rate = metrics.Metric(
title = Title("Incoming bytes/sec (small window)"),
name = "rs_small_rate",
unit = UNIT_MSGS,
color = metrics.Color.LIGHT_BLUE,
)
metric_graylog_input_metrics_rs_m5_rate = metrics.Metric(
title = Title("Incoming bytes/sec (5 min)"),
name = "rs_m5_rate",
metric_graylog_input_metrics_rs_medium_rate = metrics.Metric(
title = Title("Incoming bytes/sec (medium window)"),
name = "rs_medium_rate",
unit = UNIT_MSGS,
color = metrics.Color.BLUE,
)
metric_graylog_input_metrics_rs_m15_rate = metrics.Metric(
title = Title("Incoming bytes/sec (15 min)"),
name = "rs_m15_rate",
metric_graylog_input_metrics_rs_large_rate = metrics.Metric(
title = Title("Incoming bytes/sec (large window)"),
name = "rs_large_rate",
unit = UNIT_MSGS,
color = metrics.Color.DARK_BLUE,
)

View File

@ -1,9 +1,11 @@
# Copyright (C) 2026 Spearhead Systems SRL
from cmk.rulesets.v1.form_specs.validators import NumberInRange
from cmk.rulesets.v1.form_specs import (
Dictionary,
DictElement,
Float,
Integer,
DefaultValue,
LevelDirection,
SimpleLevels,
@ -29,7 +31,7 @@ titles = {
}
def _rate(metric, minutes, level):
def _rate(metric, window, level):
unit_name = titles[metric]
def_value = default_levels[level]
@ -38,7 +40,7 @@ def _rate(metric, minutes, level):
title = Title(f"{level.value.capitalize()} level"),
level_direction = level,
form_spec_template = Float(
title = Title(f"{unit_name}/{minutes}min"),
title = Title(f"{unit_name} {window} time window"),
),
prefill_fixed_levels = DefaultValue(value=(def_value, def_value))
),
@ -49,13 +51,25 @@ def _parameter_valuespec_graylog_input_metrics():
elements = {}
for metric in ["im", "rs"]:
for minutes in [1, 5, 15]:
elements[f"{metric}_m{minutes}_rate"] = DictElement(
for window, default in [("small", 1), ("medium", 5), ("large", 15)]:
elements[f"{metric}_{window}_rate"] = DictElement(
parameter_form = Dictionary(
title = Title(f"Incoming {titles[metric]} for past {minutes} minute(s)"),
title = Title(f"Incoming {titles[metric]} over {window} time window"),
elements = {
"upper": _rate(metric, minutes, LevelDirection.UPPER),
"lower": _rate(metric, minutes, LevelDirection.LOWER)
"window": DictElement(
required = True,
parameter_form = Integer(
title = Title("Minutes"),
help_text = Help(
"Set how many minutes this Range Weighted Moving Average "
"window should use."
),
prefill = DefaultValue(default),
custom_validate = (NumberInRange(min_value=1),),
),
),
"upper": _rate(metric, window, LevelDirection.UPPER),
"lower": _rate(metric, window, LevelDirection.LOWER),
}
)
)