For a recent project, I wanted to add a custom tag to data coming in from a built-in input plugin for telegraf.
The input plugin was the procstat plugin, and the custom data was information from pacemaker (a clustering solution for linux). I wanted to add a tag indicating if the current host was the “active” host in my active/passive setup.
The execd processor plugin runs an external program as a separate process and pipes metrics in to the process’s STDIN and reads processed metrics from its STDOUT.
#!/usr/bin/pythonfrom__future__importprint_functionfromsysimportstderrimportfileinputimportsubprocessimporttimecache_value=Nonecache_time=0resource_name="VIP"defget_crm_status():globalcache_value,cache_time,resource_namectime=time.time()ifctime-cache_time>10:# print("Cache busted", file=stderr)try:crm_node=subprocess.check_output(["sudo","/usr/sbin/crm_node","-n"]).rstrip()crm_resource=subprocess.check_output(["sudo","/usr/sbin/crm_resource","-r",resource_name,"-W"]).rstrip()active_node=crm_resource.split(" ")[-1]ifactive_node==crm_node:cache_value="active"else:cache_value="inactive"except(OSError,IOError)ase:print("Exception: %s"%e,file=stderr)# Don't report active/inactive if crm commands are not foundcache_value=NoneexceptExceptionase:print("Exception: %s"%e,file=stderr)# Report as inactive in other cases by defaultcache_value="inactive"cache_time=ctimereturncache_valuedeflineprotocol_add_tag(line,key,value):first_comma=line.find(",")first_space=line.find(" ")iffirst_comma>=0andfirst_comma<=first_space:split_str=","else:split_str=" "parts=line.split(split_str)first,rest=parts[0],parts[1:]first_new=first+","+key+"="+valuereturnsplit_str.join([first_new]+rest)forlineinfileinput.input():line=line.rstrip()crm_status=get_crm_status()ifcrm_status:try:new_line=lineprotocol_add_tag(line,"crm_status",crm_status)exceptExceptionase:print("Exception: %s, Input: %s"%(e,line),file=stderr)new_line=lineelse:new_line=lineprint(new_line)
Telegraf configuration
Here’s a sample telegraf configuration that routes data from “system” plugin to execd processor plugin, and finally outputs to influxdb.
In this example, we wanted to get the value of the tag from an external program. If the tag can be calculated from the incoming data itself, then things are much simpler. There are a lot of processor plugins, and many things can be achieved using just those.