File: //var/lib/pcp/pmdas/prometheus/pmdaprometheus.python
#!/usr/bin/env pmpython
#
# Copyright (c) 2017-2018 Red Hat.
# Copyright (c) 2017 Ronak Jain.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
''' Performance Metrics Domain Agent exporting Prometheus endpoint metrics. '''
import os
import re
import time
import pickle
import traceback
import argparse
import threading
import subprocess
from ctypes import c_int
from socket import gethostname
from stat import ST_MODE, S_IXUSR, ST_CTIME
import requests
import sys
if sys.version[0] == '2':
import Queue as queue
else:
import queue as queue
import cpmapi as c_api
import cpmda
from pcp.pmapi import pmUnits
from pcp.pmda import PMDA, pmdaMetric, pmdaIndom, pmdaInstid
# Sort config file list - generally needed because os.walk()
# returns a different order on different platforms and/or filesystems.
# Config file list sorting is expensive with a large number of URLs
# and/or scripts. See the --nosort option to turn it off.
sort_conf_list = True
# Number of seconds to wait between poll attempts on a source that
# we've never been able to connect to & collect a list of metrics from.
empty_source_pmns_poll = 10.0
MAX_CLUSTER = 0xfff # ~ max. number of prometheus sources
MAX_METRIC = 0x3ff # ~ max. number of metrics per source
MAX_INDOM = 0x7fffffff # coincidentally, ~ product of above
# These numbers are combined to create unique numbers for several
# purposes. The first two types are used only for internal
# pmdaprometheus purposes, and are not visible as indoms to pmapi
# clients.
#
# indom# 0: maps source nicknames to 12-bit "cluster" numbers, cluster#0 is not used
# indom# 1..4095: for each source=cluster, map metric names to pmid numbers
# indom# 4096 + cluster#*1024 ...+1023: actual pmns indom for each metric
labeltype = {c_api.PM_LABEL_CONTEXT: "PM_LABEL_CONTEXT",
c_api.PM_LABEL_DOMAIN: "PM_LABEL_DOMAIN",
c_api.PM_LABEL_INDOM: "PM_LABEL_INDOM",
c_api.PM_LABEL_CLUSTER: "PM_LABEL_CLUSTER",
c_api.PM_LABEL_ITEM: "PM_LABEL_ITEM",
c_api.PM_LABEL_INSTANCES: "PM_LABEL_INSTANCES"}
class Metric(object):
''' Metric information class '''
def __init__(self,source,name,metricnum,instances,pcpline,helpline,typeline):
self.source = source
self.name = name
self.metricnum = metricnum # seen during fetch callbacks
self.pmid = source.pmda.pmid(source.cluster, metricnum) # add domain/cluster#
self.indom_number = (MAX_CLUSTER+1 +
(source.cluster * (MAX_METRIC+1)) +
metricnum)
self.pcpline = pcpline
self.typeline = typeline
self.values = {} # instance-vector-to-value
self.labels = {} # metric labels
self.inst_labels = {} # dict of instid:labels (where labels is a dict)
if instances is not None: # dict is empty for singular metrics (no indom)
for key, val in instances.items():
self.labels[key] = val
self.assign_metadata()
if instances:
self.mindom = self.source.pmda.indom(self.indom_number) # add domain#
self.indom_table = PersistentNameTable(self.source.pmda,
self.indom_number, MAX_INDOM)
else:
self.mindom = c_api.PM_INDOM_NULL
self.indom_table = None
self.obj = pmdaMetric(self.pmid,self.mtype,self.mindom,self.msem,self.munits)
if helpline: # it could be None!
unescaped = helpline.replace('\\\\', '\\').replace('\\n', '\n')
split = unescaped.split('\n')
help_oneline = split[0] # must have at least one entry
help_text = '\n'.join(split[1:]) # may have other entries
else:
help_oneline = ''
help_text = ''
try:
self.source.pmda.add_metric(self.mname, self.obj, help_oneline, help_text)
self.source.pmda.set_need_refresh()
except Exception as e:
self.source.pmda.err("Cannot add metric %s (%d): %s" % (self.mname, self.pmid, e))
if self.source.pmda.dbg:
self.source.pmda.debug("created metric %#x (%s) labels='%s'" % (self.pmid, self.mname, self.labels))
def assign_metadata(self):
''' Compute metric metadata self.{mtype,mname,msem,munits}
from the available information already stored in self.
'''
self.mtype = c_api.PM_TYPE_DOUBLE # NB: allow overriding sometime
# Split the prometheus metric name by "_", to help
# decode convention for squishing unit/scale data.
pieces = self.name.split('_')
if self.pcpline:
pcpline_pieces = self.pcpline.split(' ')
else:
pcpline_pieces = []
len_pcpline_pieces = len(pcpline_pieces)
if len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'instant':
self.msem = c_api.PM_SEM_INSTANT
elif len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'counter':
self.msem = c_api.PM_SEM_COUNTER
elif len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'discrete':
self.msem = c_api.PM_SEM_DISCRETE
elif self.typeline == 'counter' or 'total' in pieces or 'count' in pieces or 'sum' in pieces:
self.msem = c_api.PM_SEM_COUNTER
elif self.typeline == 'histogram' and 'bucket' in pieces:
self.msem = c_api.PM_SEM_COUNTER
else:
self.msem = c_api.PM_SEM_INSTANT
if len_pcpline_pieces >= 1:
self.mname = 'prometheus.' + self.source.name + '.' + pcpline_pieces[0]
else:
self.mname = 'prometheus.' + self.source.name + '.' + self.name.replace(":", ".")
# NB: ignore the PCP units suffix.
# we could do:
# self.munits = cpmapi.pmParseUnitsStr(pcpline_pieces[2])
# ... but we'd have to undo the base-scale conversion done by pmwebd. So
# instead we use the inferred unit-scale (even though it doesn't match the
# original pcp source). We don't preserve PM_TYPE_* either.
if ((self.typeline == 'histogram' or self.typeline == 'summary')
and 'count' in pieces): # regardless of UNIT; *_sum peer is has proper type
self.munits = pmUnits(0,0,1,0,0,0) # simple count
elif self.typeline == 'histogram' and 'bucket' in pieces: # ditto
self.munits = pmUnits(0,0,1,0,0,0) # simple count
elif 'seconds' in pieces:
self.munits = pmUnits(0,1,0,0,3,0)
elif 'microseconds' in pieces: # not listed in https://prometheus.io/docs/practices/naming/
self.munits = pmUnits(0,1,0,0,1,0)
elif 'bytes' in pieces:
self.munits = pmUnits(1,0,0,0,0,0)
else:
self.munits = pmUnits(0,0,0,0,0,0) # default, dimensionless old-school count
assert self.munits and self.msem and self.mname and self.mtype
def clear_values(self):
''' Erase all stored instance/value pairs, in anticipation of a new set. '''
self.values.clear()
def store_inst(self, labels, value):
''' Store given new instance/value pair. '''
assert (labels is None) == (self.indom_table is None) # no metric indom flipflop
if labels is None:
self.values[c_api.PM_IN_NULL] = float(value)
else:
if self.pcpline: # pmwebd
# NB: no quoting/transforms - preserve incoming value verbatim
instname = labels['instance']
else:
self.indom_table.prefix_mode = True # Mark for instance# prefixed names
instname = ""
for key, val in sorted(labels.items()):
if len(instname) > 0:
instname += ' '
instname += key + ":" + val + ""
self.labels[key] = val
inst = self.indom_table.intern_lookup_value(instname)
self.values[inst] = float(value)
self.inst_labels[inst] = labels
if self.source.pmda.dbg:
self.source.pmda.debug('store inst=%d instname="%s" value="%s"' % (inst, instname, value))
self.source.pmda.debug('store inst_labels[%d] = %s' % (inst, labels))
def save(self):
if self.indom_table is not None:
self.indom_table.save()
def fetch_inst(self, inst): # fetch instance
if not self.values:
# Metric may have disappeared
return [c_api.PM_ERR_AGAIN, 0]
if inst in self.values:
return [self.values[inst], 1]
return [c_api.PM_ERR_INST, 0]
class PersistentNameTable(object):
'''Persistent name table. Answers name-to-number queries by assigning
persistent ids. Updates pmda's indom table intermittently.
Persistent by saving dictionary in $PCP_VAR_DIR/pmdas/,
similarly to how pmdaCache functions do. A table may be flagged to
add an instance-number prefix to all its instance-names, for ensuring
uniqueness of the sort approved by pmLookupIndom(3).
'''
def __init__(self, pmda, indom, maxnum):
self.pmda = pmda
self.indom = indom
self.maxnum = maxnum
self.need_save = False
self.prefix_mode = False # set later for non-PCP metric instances
self.store_file_name = '%s/config/pmda/%d.%d.py' % (os.environ['PCP_VAR_DIR'],
pmda.domain, indom)
try: # slightly used!
with open(self.store_file_name, 'rb') as f:
self.instances = pickle.load(f, encoding="bytes")
try:
# Fetch the prefixness of the mapping early, so we send
# the correct indom strings to the C code the first time.
self.prefix_mode = pickle.load(f, encoding="bytes")
except:
pass
self.pmda.debug("loaded %s%s, %d instances" %
(self.store_file_name,
(" (pfx)" if self.prefix_mode else ""),
len(self.instances))) if self.pmda.dbg else None
self.need_save = True # to push values down into c pmda layer
# XXX: need separate flag for 'push values to file'; too much i/o
except Exception as e: # new!
self.instances = [] # won't be saved till nonempty
self.names_to_instances = {}
for i, n in enumerate(self.instances):
self.names_to_instances[n] = i
# prime the pmda indom table with an empty list; the pmdaIndom
# object will be replaced shortly within .replace_indom()
self.pmdaindom = pmdaIndom(pmda.indom(self.indom), [])
self.pmda.add_indom(self.pmdaindom)
self.save() # replace it with real data now, if non-empty
def save(self):
'''Push values to the PMDA layer as well as the backing store file.'''
if self.need_save:
# save to the pmda C layer
indom_array = [] # pmdaInstid array, computed on demand
self.pmda.debug("indom %d:" % (self.pmda.indom(self.indom))) if self.pmda.dbg else None
for i, n in enumerate(self.instances):
if self.prefix_mode:
instname = str(i) + " " + str(n)
else:
instname = str(n)
self.pmda.debug("%4d:\t%s" % (i, instname)) if self.pmda.dbg else None
indom_array.append(pmdaInstid(c_int(i), instname))
# NB: use replace_indom(int,[...]) overload, to pass indom_array,
# else it gets ignored
self.pmda.replace_indom(self.pmda.indom(self.indom), indom_array)
self.pmda.set_need_refresh()
# save to disk too
try: # slightly used!
with open(self.store_file_name,'wb') as f:
pickle.dump(self.instances, f, protocol=0) # 0: most compatible
pickle.dump(self.prefix_mode, f, protocol=0)
self.pmda.debug("saved %s%s, %d instances" %
(self.store_file_name,
(" (pfx)" if self.prefix_mode else ""),
len(self.instances))) if self.pmda.dbg else None
self.need_save = False # reset only on success
except Exception as e:
self.pmda.err("cannot save %s: %s" % (self.store_file_name, e))
def intern_lookup_value(self, name):
'''Add/lookup given name, return its persistent identifier.'''
if name in self.names_to_instances: # fast path
# mapping the translated name to inst
return self.names_to_instances[name]
else: # new name
num = len(self.instances)
if num > self.maxnum:
raise ValueError('Too many (%d) different names' % num)
self.instances.append(name)
self.names_to_instances[name] = num
assert self.instances[num] == name
self.need_save = True
return num # the new inst number
class SampleLineParser(object):
'''A parser for one metric [{instance}] value [timestamp] line.
State machine is required since we're lexing text with embedded
quoted freeform strings with punctuation, etc., so can't
simply do substring searching.
'''
def parse_metric_name_start(self, char):
if char.isspace():
pass
else:
self.state = self.parse_metric_name
self.name = char
def parse_metric_name(self, char):
if char == '{':
self.state = self.parse_label_name_start
self.labels = {}
elif char.isspace():
self.state = self.parse_post_metric_name
else:
self.name += char
def parse_post_metric_name(self, char):
if char.isspace():
pass
elif char == '{':
self.state = self.parse_label_name_start
self.labels = {}
else:
self.state = self.parse_value
self.value = char
def parse_label_name_start(self,char):
if char.isspace():
pass
elif char == '}':
self.state = self.parse_post_labels
else:
self.state = self.parse_label_name
self.lname = char
def parse_label_name(self, char):
if char.isspace():
self.state = self.parse_label_equals
elif char == '=':
self.state = self.parse_label_value_start
else:
self.lname += char
def parse_label_equals(self, char):
if char.isspace():
pass
elif char == '=':
self.state = self.parse_label_value_start
else:
raise ValueError("Expected =")
def parse_label_value_start(self, char):
if char.isspace():
pass
elif char == '"':
self.state = self.parse_label_value
self.lvalue = ""
else:
raise ValueError("Expected \"")
def parse_label_value(self, char):
if char == '\\':
self.state = self.parse_label_value_escapechar
elif char == '"':
self.state = self.post_label_value
self.labels[self.lname] = self.lvalue
self.lname = None
self.lvalue = None
else:
self.lvalue += char
def parse_label_value_escapechar(self, char):
# XXX: maybe pass through \-escaped identifiers literally through to pcp
# otherwise we'd have to filter them out at indom-instance-name creation
if char == '\\':
self.state = self.parse_label_value
self.lvalue += '\\'
elif char == 'n':
self.state = self.parse_label_value
self.lvalue += '\n'
elif char == '"':
self.state = self.parse_label_value
self.lvalue += '\"'
else:
self.state = self.parse_label_value
self.lvalue += '\\' + char # transcribe \XYZ literally
def post_label_value(self, char):
if char.isspace():
pass
elif char == ',':
self.state = self.parse_label_name_start
elif char == '}':
self.state = self.parse_post_labels
else:
raise ValueError("Expected , or }")
def parse_post_labels(self, char):
if char.isspace():
pass
else:
self.state = self.parse_value
self.value = char
def parse_value(self, char):
if char.isspace(): # timestamp possibly following
self.state = self.parse_chomp
else:
self.value += char
def parse_chomp(self, char): # ignored stuff
pass
def __init__(self, line):
# mis-initialize output variables to force state
# machine transitions to do it right
self.name = None
self.value = None
self.labels = None
self.lname = None
self.lvalue = None
# run state machine
self.state = self.parse_metric_name_start
for char in line:
self.state(char)
assert self.name
assert self.value
class Source(object):
'''An instance of this class represents a distinct Prometheus exporter,
identified by a nickname (the next PMNS component beneath prometheus.*),
and a URL. Metrics will show up at prometheus.NICKNAME.MET.RIC as/when
the source is online.
'''
def __init__(self, name, cluster, path, is_scripted, pmda):
self.name = name # source nickname
self.cluster = cluster # unique/persistent id# for nickname
self.path = path # pathname to .url or executable file
self.url = None
self.parse_url_time = 0 # timestamp of config file when it was last parsed
self.is_scripted = is_scripted
self.pmda = pmda # the shared pmda
self.requests = None
self.headers = None
self.filterlist = None
self.document = None
self.refresh_time = 0 # "never"
if not is_scripted:
# source is a URL. Create a session for it.
self.requests = self.pmda.requests # allow persistent connections etc.
self.headers = {} # dict of headers for the http get
self.filterlist = [] # list of filters
# persistently assign numeric id to our metrics
self.pmids_table = PersistentNameTable(self.pmda, cluster, MAX_METRIC)
self.metrics_by_name = {} # name -> Metric
self.metrics_by_num = {} # number (last component of pmid) -> Metric
def old_enough_for_refresh(self):
'''But what is "old"? If it is empty (no metrics), then it
has probably never been connected to successfully.
OTOH, if it hasn't been fetched from "recently", there may
be new metrics. So we could track the last time a fetch was
done to this source, and "time out" the PMNS from it.
'''
now = time.time()
last_try_age = now - self.refresh_time
return len(self.metrics_by_name) == 0 and last_try_age > empty_source_pmns_poll
def check_filter(self, name, entrytype):
'''
return True if name of type entrytype ("METRIC" or "LABEL")
is included (and not execluded) by filters for this source.
First match prevails. No match => True
'''
if self.filterlist is None:
return True
for f in self.filterlist:
fs = f.split() # INCLUDE|EXCLUDE METRIC|LABEL regex
if fs[1] == entrytype:
pat = ' '.join(fs[2:]) # regex may have spaces
rx = self.pmda.lookup_regex(pat)
self.pmda.debug("check_filter(%s) fs=%s regex='%s'" % (name,fs,pat)) if self.pmda.dbg else None
if rx.match(name):
self.pmda.debug(".... rx.match(%s,%s) -> True" % (pat,name)) if self.pmda.dbg else None
return fs[0] == "INCLUDE"
# no regex match => include
return True
def filter_labelset(self, labelset):
''' return labelset trimmed by filters '''
included_labels = {}
if labelset is None:
return None # singular; nothing to include or exclude
for lname, lval in labelset.items():
if self.check_filter(lname, "LABEL"):
included_labels[lname] = lval
if included_labels == {}:
return None # no labels => singular indom
return included_labels
def parse_metric_line(self, line, pcpline, helpline, typeline):
'''
Parse the sample line, identify/create corresponding metric & instance.
'''
try:
sp = SampleLineParser(line)
self.pmda.debug("parsed '%s' -> %s %s %s" % (line, sp.name, sp.labels, sp.value)) if self.pmda.dbg else None
included_labels = self.filter_labelset(sp.labels)
self.pmda.debug("included_labels '%s'" % (included_labels)) if self.pmda.dbg else None
if sp.name in self.metrics_by_name:
m = self.metrics_by_name[sp.name]
assert self.metrics_by_num[m.metricnum] == m
m.store_inst(included_labels, sp.value)
else:
# check metric is not excluded by filters
fullname = "prometheus.%s.%s" % (self.name, sp.name)
self.pmda.debug("Checking metric '%s'" % (fullname)) if self.pmda.dbg else None
if not self.check_filter(fullname, "METRIC"):
self.pmda.log("Metric %s excluded by config filters" % fullname)
return
else:
metricnum = self.pmids_table.intern_lookup_value(sp.name)
m = Metric(self,sp.name,metricnum,included_labels, pcpline,helpline,typeline)
self.metrics_by_name[sp.name] = m
self.metrics_by_num[metricnum] = m # not pmid!
m.store_inst(included_labels, sp.value)
self.pmda.set_notify_change()
except Exception as e:
self.pmda.err("cannot parse/store %s: %s" % (line, e))
traceback.print_exc() # traceback can be handy here
def parse_lines(self, text):
'''Refresh all the metric metadata as it is found, including creating
new metrics. Store away metric values for subsequent
fetch()es. Parse errors may result in exceptions.
That's OK, we don't try heroics to parse non-compliant
data. Return number of metrics extracted.
'''
num_metrics = 0
lines = text.splitlines()
pcpline = None
helpline = None
typeline = None
state = "metadata"
for line in lines:
self.pmda.debug("line: %s state: %s" % (line, state)) if self.pmda.dbg else None
l = line.strip() # whitespace
if l == "": # blank line, ignore, no state change
continue
elif l.startswith("#"): # comment
if state == "metrics":
state = "metadata"
pcpline = None # NB: throw away previous block's metadata
helpline = None
typeline = None
lp = l.split()
if len(lp) < 2:
continue
# NB: for a well-formed exporter file,
# the # metadata blocks must precede
# the metric values; we can ignore
# lp[2].
if lp[1] == 'PCP':
pcpline = ' '.join(lp[2:])
elif lp[1] == 'HELP':
# assume lp[2] matches metric name
helpline = ' '.join(lp[3:])
elif lp[1] == 'TYPE':
# assume lp[2] matches metric name
typeline = ' '.join(lp[3:])
else:
pass # ignore other comment lines
else: # metric{...} value line
state = "metrics"
# NB: could verify helpline/typeline lp[2] matches,
# but we don't have to go out of our way to support
# non-compliant exporters.
self.parse_metric_line(l,pcpline,helpline,typeline)
num_metrics += 1
# NB: this logic only ever -adds- Metrics to a Source. If a source
# stops supplying some metrics, then a PCP app will see a PM_ERR_INST
# coming back when it tries to fetch them. We could perhaps keep the
# set of -current- metrics fresh, i.e., track any metrics that were
# in the Source but were not processed by any parse_metric_line() call.
# Then we could remove the Metric, and thereby trigger a PM_ERR_PMID
# for them. In both cases though, we have no values.
return num_metrics
def parse_url_config(self, filepath):
'''
Parse a URL config file. The first line is always the URL.
Remaining lines are prefixed with a keyword. Supported keywords
include '#' for a comment, 'HEADER:' to add to the header passed
to the headers dict parameter to the get() call. Note the ':' are
important, and spaces are collapsed. e.g.
# here is an example header
http://someserver/someplace/endpoint.html
HEADER: authtoken: some auth token
# filters are used to include/exclude metric names
FILTER: {INCLUDE|EXCLUDE} METRIC regex ...
# filters are also used to include/exclude labels
FILTER: {INCLUDE|EXCLUDE} LABEL regex ...
'''
conf = open(filepath, 'r').read().strip().split('\n')
self.url = conf[0]
self.filterlist = [] # filters matching metric names or labels
for line in conf[1:]:
if len(line) == 0 or line.startswith('#'):
continue
elif line.startswith('HEADER:'):
header=line.split(':',2)
key=''.join(header[1].split())
val=' '.join(header[2:]).lstrip()
self.headers[key] = val
elif line.startswith('FILTER:'):
# strip off 'FILTER:' and any leading space
# each list entry is {INCLUDE|EXCLUDE} METRIC regex ... to end of line
filter=''.join(line.split(':',2)[1]).lstrip()
self.filterlist.append(filter)
else:
self.pmda.log('Warning: %s ignored unrecognised config entry "%s"' % (self.url,line))
self.pmda.debug("DEBUG url: %s HEADERS: %s" % (self.url,self.headers)) if self.pmda.dbg else None
self.pmda.debug("DEBUG url: %s FILTERS: %s" % (self.url,self.filterlist)) if self.pmda.dbg else None
return
def refresh1(self, timeout):
'''
If the Source config entry is a URL (with ".url" extension), find the
target URL by reading the .url file, fetch (http GET) the target and
then save resulting document.
If the Source config entry is executable, run it, expecting prometheus
formatted data on it's stdout, which is then saved.
'''
# clear cached values from all my metrics
for n, m in self.metrics_by_name.items():
m.clear_values()
# XXX: ditch metrics no longer found in document
# bump the fetch call counter for this source, and for the total (cluster 0)
self.pmda.stats_fetch_calls[self.cluster] += 1
self.pmda.stats_fetch_calls[0] += 1
self.refresh_time = fetch_time = time.time()
try:
s = os.stat(self.path)
if self.is_scripted:
# check file still exists and is executable
if not s[ST_MODE] & S_IXUSR:
self.pmda.err("cannot execute '%s': %s" % self.path)
return
elif self.parse_url_time < s[ST_CTIME]:
# (re)parse the URL from given file
self.parse_url_config(self.path)
self.parse_url_time = s[ST_CTIME]
except Exception as e:
self.pmda.err("cannot read %s: %s" % (self.path, e))
return
# fetch the document
try:
if self.is_scripted:
# Execute file, expecting prometheus metric data on stdout.
# stderr goes to the PMDA log. Failures are caught below.
self.document = subprocess.check_output(self.path, shell=False).decode()
else:
# fetch the URL
if self.url.startswith('file://'):
self.document = open(self.url[7:],'r').read()
else:
r = self.requests.get(self.url,headers=self.headers,timeout=timeout)
r.raise_for_status() # non-200? ERROR
# NB: the requests package automatically enables http keep-alive and compression
self.document = r.text
# update fetch time counter stats, in ms
incr = int(1000 * (time.time() - fetch_time))
self.pmda.stats_fetch_time[self.cluster] += incr
self.pmda.stats_fetch_time[0] += incr # total for all sources
except Exception as e:
self.pmda.err('Warning: cannot fetch URL or execute script %s: %s' % (self.path, e))
return
def refresh2(self, timeout):
'''
Parse the saved document that was recently saved in refresh1().
'''
if self.document is None: # error during fetch?
return
# parse and handle the prometheus formatted metric data
parse_time = time.time()
s = self.parse_lines(self.document)
# update parse time counter stats, in ms
incr = int(1000 * (time.time() - parse_time))
self.pmda.stats_parse_time[self.cluster] += incr
self.pmda.stats_parse_time[0] += incr # total
# save metric & indom lookup tables changes, if any
for n,m in self.metrics_by_name.items():
try: # NB: must process whole list even if exceptions escape
m.save()
except: # ... which they won't, this is just belt & suspenders
pass
self.pmids_table.save()
self.pmda.debug("fetched %d bytes with %d metrics from URL or script %s" % (len(self.document), s, self.path)) if self.pmda.dbg else None
self.document = None # don't hang onto it
def fetch(self, item, inst):
''' Retrieve metric/instance values that ought to have been found
by a recent refresh(). PM_ERR_AGAIN signals a no go.'''
try:
m = self.metrics_by_num[item]
return m.fetch_inst(inst)
except Exception as e:
self.pmda.log("Warning: cannot fetch item %d inst %d: %s" % (item,inst,e))
return [c_api.PM_ERR_AGAIN, 0]
class PrometheusPMDA(PMDA):
def __init__(self, pmda_name, domain, config, timeout, user, debugflag, logfile):
'''
Initialize the PMDA. This can take a while for large configurations.
The prometheus entry in pmcd.conf specifies to start up in "notready"
mode. Once startup and init is complete, we call pmda.pmda_ready()
to tell PMCD we are ready to process requests, just prior to calling
run() to enter the main loop. See below.
'''
# must first set user and connect to the invoking pmcd
PMDA.__init__(self, pmda_name, domain, logfile)
if user is not None:
self.set_user(user)
self.log('Note: running as user "%s"' % user)
self.connect_pmcd()
# Write debugging messages to log, see --debug cmdline option
# and the storable metric $(pmda_name).control.debug
self.dbg = debugflag
# now everything else may take time
self.pmda_name = pmda_name
self.config_dir = os.path.normpath(config)
self.config_dir_ctime = None
self.timeout = timeout
# a single central Session that all our sources can concurrently reuse
self.requests = requests.Session() # allow persistent connections
# the list of configured sources
self.source_by_name = {}
# persistently assign numeric id to source names -> pmid "cluster" numbers
self.cluster_table = PersistentNameTable(self, 0, MAX_CLUSTER)
reserved_cluster = self.cluster_table.intern_lookup_value("control")
assert reserved_cluster == 0
self.source_by_cluster = {}
# compiled regex cache
self.regex_cache = {}
# Add a IS_DYNAMIC_ROOT metric that serves as a reminder to
# pmcd to delegate all pmns requests to us. Do it early, before
# other metrics may populate beneath the $(pmda_name).* prefix.
dynamic_pmid = (0 << 31) | (511 << 22) | (args.domain << 10) | 0
dynamic_root = pmdaMetric(dynamic_pmid, 0, 0, 0, pmUnits())
self.add_metric(self.pmda_name, dynamic_root, 'dynamic root for %s metrics' % self.pmda_name)
# Add statistical metrics
self.sources_indom = self.indom(0)
# fetch call counter, per-source end-point
self.stats_fetch_calls = {0:0} # counter, keyed by cluster number
self.add_metric('%s.control.calls' % self.pmda_name, pmdaMetric(self.pmid(0, 1),
c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
pmUnits(0, 0, 1, 0, 0, c_api.PM_COUNT_ONE)),
'per-end-point source call counter')
# fetch time counter, per-source end-point
self.stats_fetch_time = {0:0} # time counter in msec, keyed by cluster number
self.add_metric('%s.control.fetch_time' % self.pmda_name, pmdaMetric(self.pmid(0, 2),
c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter
'per-end-point source fetch time counter, excluding parse time')
# parse time counter, per-source end-point
self.stats_parse_time = {0:0} # time counter in msec, keyed by cluster number
self.add_metric('%s.control.parse_time' % self.pmda_name, pmdaMetric(self.pmid(0, 3),
c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter
'per-end-point source parse time counter, excluding fetch time')
# verbose/debug messages metric
self.add_metric('%s.control.debug' % self.pmda_name, pmdaMetric(self.pmid(0, 4),
c_api.PM_TYPE_U32, c_api.PM_INDOM_NULL, c_api.PM_SEM_DISCRETE,
pmUnits(0, 0, 0, 0, 0, 0)),
'debug flag to enable verbose log messages, to enable: pmstore %s.control.debug 1' % self.pmda_name)
# schedule a refresh
self.set_need_refresh()
# store callback for prometheus.control.debug
self.set_store_callback(self.prom_store_callback)
# used frequently, but we welcome it only for initial pmns population purposes
# NB: If exceptions propagate out to cpmda, the log will contain entries such as:
# Error: fetch_callback: callback failed
self.set_refresh_metrics(self.refresh_metrics_for_pmns)
self.set_refresh_all(self.refresh_some_clusters_for_fetch) # "all" is a misnomer
self.set_fetch_callback(self.fetch_callback)
# label callbacks
self.set_label(self.prom_label)
self.set_label_callback(self.prom_label_callback)
def lookup_regex(self, pat):
''' cache of compiled regex '''
if pat not in self.regex_cache.keys():
self.regex_cache[pat] = re.compile(r"%s" % pat)
self.debug("lookup_regex: added '%s'" % pat) if self.dbg else None
return self.regex_cache[pat]
def assert_source_invariants(self, name=None, cluster=None):
''' Assert some invariants about the known sources '''
if name:
s = self.source_by_name[name]
assert s == self.source_by_cluster[s.cluster]
if cluster:
s = self.source_by_cluster[cluster]
assert s == self.source_by_name[s.name]
def traverse(self, dir, ctime):
''' Return list of files below dir, recursively '''
ret = []
m = os.path.getctime(dir)
if ctime is None or m > ctime:
ctime = m
for path, subdirs, files in os.walk(dir):
for f in files:
if not f.startswith("."):
fname = os.path.join(path, f)
m = os.path.getctime(fname)
if ctime is None or m > ctime:
ctime = m
ret.append(fname)
for d in subdirs:
m, r = self.traverse(os.path.join(path, d), ctime)
if ctime is None or m > ctime:
ctime = m
return ctime, ret
def rescan_confdir(self):
'''Scan the configuration directories for any new .url files
or scripts. Ensure there is a Source registered in the
self.source_by_name dictionary for each one.
First check if anything in the config dirs has changed lately,
else do nothing. This is important because this callback is
invoked frequently by src/python/pmda.c.
'''
traverse_time = time.time()
dir_ctime, conf_filelist = self.traverse(self.config_dir, self.config_dir_ctime)
traverse_time = time.time() - traverse_time
if self.config_dir_ctime is None or self.config_dir_ctime < dir_ctime:
self.config_dir_ctime = dir_ctime
else: # no new or changed conf files, don't rescan directory
return
self.log("Config change detected, traversed %d config entries in %.04fs, rescanning ..." % (len(conf_filelist), traverse_time))
nickname_regexp = self.lookup_regex(r"^[A-Za-z][A-Za-z0-9_.]*$")
# XXX: nuke sources related to removed files
save_cluster_table = False
if sort_conf_list:
# sorted for indom cluster consistency
conf_filelist = sorted(conf_filelist)
for file in conf_filelist:
# compute nickname for source:
# the part of the filename before .url
file_split = os.path.splitext(file)
self.debug("found %s => %s" % (file, file_split)) if self.dbg else None
# check if it's executable, or has ".url" suffix
is_scripted = os.stat(file)[ST_MODE] & S_IXUSR != 0
if not is_scripted and (len(file_split) != 2 or file_split[1] != ".url"):
# ignore this file - not executable and doesn't end in ".url"
self.debug("Warning: ignored config file '%s', doesn't end in '.url' and not executable." % file) if self.dbg else None
continue
# convert file path name into a PCP metric name
name = file_split[0].replace(self.config_dir + "/", "").replace("/",".")
if name == "control":
self.err("Warning: ignored config file '%s', '%s.control' is a reserved PMNS subtree for PMDA statistics" % (file, self.pmda.pmda_name))
continue
if not nickname_regexp.match(name):
self.err("Warning: ignored config file '%s', unsuitable for PCP namespace" % file)
continue
if name in self.source_by_name:
# this source is already known
self.assert_source_invariants(name=name)
pass
else:
try:
path = file
cluster = self.cluster_table.intern_lookup_value(name)
source = Source(name, cluster, path, is_scripted, self)
self.source_by_name[source.name] = source
self.source_by_cluster[source.cluster] = source
# initialize statistics
self.stats_fetch_calls[cluster] = 0
self.stats_fetch_time[cluster] = 0
self.stats_parse_time[cluster] = 0
save_cluster_table = True
self.log("Found source %s cluster %d" % (name, cluster))
except Exception as e:
self.err("Error allocating new cluster/source %s (%s)" % (name, e))
if save_cluster_table:
self.cluster_table.save()
self.set_notify_change()
def refresh_metrics_for_pmns(self):
'''Refresh our list of Sources. Then have each "old" Source do a
fetch, so as to populate/refresh the PMNS.
'''
self.rescan_confdir() # get our Source list up to date
# do a batch fetch of all empty sources to ensure pmns is populated
clusters = []
for k, v in self.source_by_cluster.items():
if v.old_enough_for_refresh():
clusters.append(k)
if len(clusters) > 0:
self.refresh_some_clusters_for_fetch(clusters)
def fetch_callback(self, cluster, item, inst):
''' Main fetch callback which returns the value of the metric '''
if cluster == 0: # The reserved 'control' cluster: statistics
if item == 1: # per-source calls counter
return [self.stats_fetch_calls[inst], 1] if inst in self.stats_fetch_calls else [c_api.PM_ERR_INST, 0]
elif item == 2: # per-source fetch time counter
return [self.stats_fetch_time[inst], 1] if inst in self.stats_fetch_time else [c_api.PM_ERR_INST, 0]
elif item == 3: # per-source parse time counter
return [self.stats_parse_time[inst], 1] if inst in self.stats_parse_time else [c_api.PM_ERR_INST, 0]
elif item == 4: # $(pmda_name).control.debug
return [self.dbg, 1]
return [c_api.PM_ERR_PMID, 0]
self.assert_source_invariants(cluster=cluster)
try:
if cluster in self.source_by_cluster: # end-points
return self.source_by_cluster[cluster].fetch(item, inst)
else:
return [c_api.PM_ERR_PMID, 0]
except Exception as e:
return [c_api.PM_ERR_AGAIN, 0] # was there before
def refresh1_worker(self, inqueue, outqueue):
while not inqueue.empty():
try:
cluster = inqueue.get()
if cluster > 0:
self.assert_source_invariants(cluster=cluster)
self.source_by_cluster[cluster].refresh1(self.timeout)
except Exception as e:
self.err("Error: Cannot refresh1 cluster %d: %s" % (cluster, e))
finally:
outqueue.put(cluster)
def refresh2_worker(self, cluster):
try:
if cluster > 0:
self.assert_source_invariants(cluster=cluster)
self.source_by_cluster[cluster].refresh2(self.timeout)
except Exception as e:
self.err("Error: Cannot refresh2 cluster %d: %s" % (cluster, e))
def refresh_some_clusters_for_fetch(self, _clusters):
'''Called once per pmFetch batch handling, before
prometheus_fetch_callback calls. Creates threads to
fetch data in parallel.
'''
clusters = [int(l) for l in _clusters] # convert from PyLong
inpqueue = queue.Queue()
workqueue = queue.Queue()
self.debug("refreshing clusters %s" % clusters) if self.dbg else None
for c in clusters:
inpqueue.put(c)
# We start up only a limited number of concurrent fetcher threads. We don't
# want to open an unlimited number of .url/.conf files nor sockets.
num_threads = min(100,len(clusters))
threads = []
for i in range(num_threads):
t = threading.Thread(target=self.refresh1_worker,
args=(inpqueue,workqueue))
threads.append(t)
t.daemon = True # allow shutdown if some straggler is still running
t.start()
# Each will exit when the input workqueue becomes empty.
# Consume the documents in the main thread, as they arrive in
# the workqueue. Do this single-threaded only because the
# python vm is effectively single-threaded for computations
# anyway, so cpu-bound multithreaded apps get bogged down by
# the big interpreter lock. The exception handler in
# refresh1_worker() tries to guarantee that we will get all
# the cluster numbers show up eventually.
for c in clusters: # expected eventual size of workqueue
cluster = workqueue.get()
self.refresh2_worker(cluster)
# XXX: timeout on this too?
def set_need_refresh(self):
cpmda.set_need_refresh()
self.pmns_refresh()
def prom_label(self, ident, type):
'''
return a JSONb formatted string (labelset) for given ident of given type
'''
self.debug('prom_label(ident=%d (%#x), type=%s)' % (ident, ident, labeltype[type])) if self.dbg else None
if type == c_api.PM_LABEL_INDOM:
# labels for indom=ident
indom = ident
for k, v in self.source_by_cluster.items():
for m, mv in v.metrics_by_name.items():
if mv.mindom == indom:
return '{"source":"%s"}' % v.name
return '{}' # no indom labels
elif type == c_api.PM_LABEL_CLUSTER:
# Labels for a cluster, i.e. an individual source.
# Note: if using dbpmda, test this with e.g. label cluster 144.1
# (you need the domain (default 144) and the cluster number (1, 2, ..)
try:
s = self.source_by_cluster[ident]
if s.is_scripted:
# The script should insert a hostname="somewhere" label if it
# fetches from a remote host. This will override the hostname
# label inserted by pmcd (at the context label level). Details
# of the label hierarchy rules are in the "PRECEDENCE" section
# of the pmLookupLabels(3) man page.
return '{"script":"%s","source":"%s"}' % (s.path, s.name)
else:
# extract hostname label from the URL
if s.url.startswith("http"):
host = s.url.replace("http://", "").replace("https://", "").split("/")[0]
elif s.url.startswith("file://"):
host = gethostname()
else:
host = "localhost"
if host == "localhost":
host = gethostname()
return '{"hostname":"%s","source":"%s","url":"%s"}' % (host, s.name, s.url)
except:
return '{}' # ignore key error for unknown cluster number
else: # empty label set for all other types
return '{}'
def prom_label_callback(self, indom, inst):
'''
return label for given instance ID in given indom
'''
instlabels = ''
try:
self.debug('prom_label_callback(indom=%#x, inst=%d)' % (indom, inst)) if self.dbg else None
for k, v in self.source_by_cluster.items():
for m, mv in v.metrics_by_name.items():
if indom == mv.mindom:
for i, n in enumerate(mv.indom_table.instances):
if inst == i:
# found
self.debug('prom_label_callback: found inst label "%s"' % n) if self.dbg else None
# first add the instname label, which matches the external instance name
instlabels = '"instname": "%s %s"' % (i,n)
# and now each label for this instance separately
self.debug('prom_label_callback: i=%d mv.inst_labels[%d]=%s' % (i,i,mv.inst_labels[i]))
if i in mv.inst_labels:
for l,v in mv.inst_labels[i].items():
instlabels = instlabels + "," + '"%s":"%s"' % (l,v)
except Exception as e:
self.debug("prom_label_callback: exception %s" % e)
self.debug(traceback.format_exc())
return '{%s}' % instlabels
def prom_store_callback(self, cluster, item, inst, val):
'''
To enable verbose log messages: pmstore prometheus.control.debug 1
'''
if cluster == 0 and item == 4:
if val < 0:
return c_api.PM_ERR_BADSTORE
self.dbg = (val != 0)
self.log('%s.control.debug: set to %d' % (self.pmda_name, self.dbg))
return 0
if cluster < MAX_CLUSTER and item < MAX_METRIC:
return c_api.PM_ERR_PERMISSION
return c_api_PM_ERR_PMID
def debug(self, s):
if self.dbg:
super(PrometheusPMDA, self).log("debug: " + s)
def log(self, s):
super(PrometheusPMDA, self).log(s)
def err(self, s):
super(PrometheusPMDA, self).err(s)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Prometheus PMDA.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'-D', '--debug',
default=False,
help='enable debug messages in log (default disabled).',
action='store_true')
parser.add_argument(
'-c', '--config',
type=str,
default='config.d',
help='configuration directory')
parser.add_argument(
'-d', '--domain',
type=int,
default=144,
help='PMDA domain number (default 144)')
parser.add_argument(
'-l', '--log',
type=str,
default='prometheus.log',
help='log filename')
parser.add_argument(
'-n', '--nosort',
default=False,
help='do not sort the config file list (default is to sort)',
action='store_true')
parser.add_argument(
'-r', '--root',
type=str,
default='prometheus',
help='dynamic PMNS root name')
parser.add_argument(
'-t', '--timeout',
type=int,
default=2,
help='HTTP GET timeout for each end-point URL (default 2 seconds)')
parser.add_argument(
'-u', '--user',
type=str,
default=None,
help='set the username to run under (default is current user)')
args = parser.parse_args()
if args.nosort:
sort_conf_list = False
pmdadir = os.path.join(os.getenv('PCP_PMDAS_DIR'), args.root)
if not args.config.startswith("/"):
args.config = os.path.join(pmdadir, args.config)
# This PMDA starts up in the "notready" state, see the Install script where
# the IPC protocol is ipc_prot="binary notready". See also pmcd(1) man page.
# The "binary notready" setting can also be manually configured in pmcd.conf.
# Default domain number is PMDA(144), see -d option.
pmda = PrometheusPMDA(args.root, args.domain, args.config, args.timeout, args.user, args.debug, args.log)
# Scan initial config
pmda.log("Initializing ... currently in notready state.")
pmda.timeout = args.timeout * 10
pmda.refresh_metrics_for_pmns()
pmda.timeout = args.timeout
# Tell PMCD that we are now ready to process requests.
pmda.pmda_ready()
pmda.log("Ready to process requests")
# Now enter the main loop
pmda.run()