Telegraf: dynamically adding custom tags

Srijan Choudhary Srijan Choudhary
- 2 min read

Background

For a recent project, I wanted to add a custom tag to data coming in from a built-in input plugin for telegraf.

The input plugin was the procstat plugin, and the custom data was information from pacemaker (a clustering solution for linux). I wanted to add a tag indicating if the current host was the "active" host in my active/passive setup.

For this, the best solution I came up with was to use a recently released execd processor plugin for telegraf.

How it works

The execd processor plugin runs an external program as a separate process and pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT.

Telegraf plugins interaction diagram
Telegraf plugins interaction. View Source

Telegraf's filtering parameters can be used to select or limit data from which input plugins will go to this processor.

The external program

The external program I wrote does the following:

  1. Get pacemaker status and cache it for 10 seconds
  2. Read a line from stdin
  3. Append the required information as a tag in the data
  4. Write it to stdout

The caching is just an optimization - it was more to do with decreasing polluting the logs than actual speed improvements.

Also, I've done the Influxdb lineprotocol parsing in my code directly (because my usecase is simple), but this can be substituted by an actual library meant for handling lineprotocol.

#!/usr/bin/python

from __future__ import print_function
from sys import stderr
import fileinput
import subprocess
import time

cache_value = None
cache_time = 0
resource_name = "VIP"

def get_crm_status():
    global cache_value, cache_time, resource_name
    ctime = time.time()
    if ctime - cache_time > 10:
        # print("Cache busted", file=stderr)
        try:
            crm_node = subprocess.check_output(["sudo", "/usr/sbin/crm_node", "-n"]).rstrip()
            crm_resource = subprocess.check_output(["sudo", "/usr/sbin/crm_resource", "-r", resource_name, "-W"]).rstrip()
            active_node = crm_resource.split(" ")[-1]
            if active_node == crm_node:
                cache_value = "active"
            else:
                cache_value = "inactive"
        except (OSError, IOError) as e:
            print("Exception: %s" % e, file=stderr)
            # Don't report active/inactive if crm commands are not found
            cache_value = None
        except Exception as e:
            print("Exception: %s" % e, file=stderr)
            # Report as inactive in other cases by default
            cache_value = "inactive"
        cache_time = ctime
    return cache_value

def lineprotocol_add_tag(line, key, value):
    first_comma = line.find(",")
    first_space = line.find(" ")
    if first_comma >= 0 and first_comma <= first_space:
        split_str = ","
    else:
        split_str = " "
    parts = line.split(split_str)
    first, rest = parts[0], parts[1:]
    first_new = first + "," + key + "=" + value
    return split_str.join([first_new] + rest)

for line in fileinput.input():
    line = line.rstrip()
    crm_status = get_crm_status()
    if crm_status:
        try:
            new_line = lineprotocol_add_tag(line, "crm_status", crm_status)
        except Exception as e:
            print("Exception: %s, Input: %s" % (e, line), file=stderr)
            new_line = line
    else:
        new_line = line

    print(new_line)
pacemaker_status.py

Telegraf configuration

Here's a sample telegraf configuration that routes data from "system" plugin to execd processor plugin, and finally outputs to influxdb.

[agent]
  interval = "30s"

[[inputs.cpu]]

[[inputs.system]]

[[processors.execd]]
  command = ["/usr/bin/python", "/etc/telegraf/scripts/pacemaker_status.py"]
  namepass = ["system"]

[[outputs.influxdb]]
  urls = ["http://127.0.0.1:8086"]
  database = "telegraf"
telegraf.conf

Other types of dynamic tags

In this example, we wanted to get the value of the tag from an external program. If the tag can be calculated from the incoming data itself, then things are much simpler. There are a lot of processor plugins, and many things can be achieved using just those.

Interactions