HEX
Server: Apache/2.4.34 (Red Hat) OpenSSL/1.0.2k-fips
System: Linux WORDPRESS 3.10.0-1160.118.1.el7.x86_64 #1 SMP Thu Apr 4 03:33:23 EDT 2024 x86_64
User: digital (1020)
PHP: 7.2.24
Disabled: NONE
Upload Files
File: //var/lib/pcp/pmdas/prometheus/pmdaprometheus.python
#!/usr/bin/env pmpython
#
# Copyright (c) 2017-2018 Red Hat.
# Copyright (c) 2017 Ronak Jain.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
''' Performance Metrics Domain Agent exporting Prometheus endpoint metrics. '''

import os
import re
import time
import pickle
import traceback
import argparse
import threading
import subprocess
from ctypes import c_int
from socket import gethostname
from stat import ST_MODE, S_IXUSR, ST_CTIME
import requests
import sys
if sys.version[0] == '2':
    import Queue as queue
else:
    import queue as queue
import cpmapi as c_api
import cpmda
from pcp.pmapi import pmUnits
from pcp.pmda import PMDA, pmdaMetric, pmdaIndom, pmdaInstid

# Sort config file list - generally needed because os.walk()
# returns a different order on different platforms and/or filesystems.
# Config file list sorting is expensive with a large number of URLs
# and/or scripts.  See the --nosort option to turn it off.
sort_conf_list = True

# Number of seconds to wait between poll attempts on a source that
# we've never been able to connect to & collect a list of metrics from.
empty_source_pmns_poll = 10.0

MAX_CLUSTER = 0xfff    # ~ max. number of prometheus sources
MAX_METRIC = 0x3ff     # ~ max. number of metrics per source
MAX_INDOM = 0x7fffffff # coincidentally, ~ product of above

# These numbers are combined to create unique numbers for several
# purposes.  The first two types are used only for internal
# pmdaprometheus purposes, and are not visible as indoms to pmapi
# clients.
#
# indom# 0: maps source nicknames to 12-bit "cluster" numbers, cluster#0 is not used
# indom# 1..4095: for each source=cluster, map metric names to pmid numbers
# indom# 4096 + cluster#*1024 ...+1023:  actual pmns indom for each metric

labeltype = {c_api.PM_LABEL_CONTEXT: "PM_LABEL_CONTEXT",
             c_api.PM_LABEL_DOMAIN: "PM_LABEL_DOMAIN",
             c_api.PM_LABEL_INDOM: "PM_LABEL_INDOM",
             c_api.PM_LABEL_CLUSTER: "PM_LABEL_CLUSTER",
             c_api.PM_LABEL_ITEM: "PM_LABEL_ITEM",
             c_api.PM_LABEL_INSTANCES: "PM_LABEL_INSTANCES"}

class Metric(object):
    ''' Metric information class '''
    def __init__(self,source,name,metricnum,instances,pcpline,helpline,typeline):
        self.source = source
        self.name = name
        self.metricnum = metricnum # seen during fetch callbacks
        self.pmid = source.pmda.pmid(source.cluster, metricnum) # add domain/cluster#
        self.indom_number = (MAX_CLUSTER+1 +
                     (source.cluster * (MAX_METRIC+1)) +
                     metricnum)
        self.pcpline = pcpline
        self.typeline = typeline
        self.values = {} # instance-vector-to-value
        self.labels = {} # metric labels
        self.inst_labels = {} # dict of instid:labels (where labels is a dict)
        if instances is not None: # dict is empty for singular metrics (no indom)
            for key, val in instances.items():
                self.labels[key] = val

        self.assign_metadata()
        if instances:
            self.mindom = self.source.pmda.indom(self.indom_number) # add domain#
            self.indom_table = PersistentNameTable(self.source.pmda,
                                   self.indom_number, MAX_INDOM)
        else:
            self.mindom = c_api.PM_INDOM_NULL
            self.indom_table = None
        self.obj = pmdaMetric(self.pmid,self.mtype,self.mindom,self.msem,self.munits)
        if helpline: # it could be None!
            unescaped = helpline.replace('\\\\', '\\').replace('\\n', '\n')
            split = unescaped.split('\n')
            help_oneline = split[0] # must have at least one entry
            help_text = '\n'.join(split[1:]) # may have other entries
        else:
            help_oneline = ''
            help_text = ''

        try:
            self.source.pmda.add_metric(self.mname, self.obj, help_oneline, help_text)
            self.source.pmda.set_need_refresh()
        except Exception as e:
            self.source.pmda.err("Cannot add metric %s (%d): %s" % (self.mname, self.pmid, e))
            if self.source.pmda.dbg:
                self.source.pmda.debug("created metric %#x (%s) labels='%s'" % (self.pmid, self.mname, self.labels))


    def assign_metadata(self):
        ''' Compute metric metadata self.{mtype,mname,msem,munits}
        from the available information already stored in self.
        '''
        self.mtype = c_api.PM_TYPE_DOUBLE # NB: allow overriding sometime

        # Split the prometheus metric name by "_", to help
        # decode convention for squishing unit/scale data.
        pieces = self.name.split('_')
        if self.pcpline:
            pcpline_pieces = self.pcpline.split(' ')
        else:
            pcpline_pieces = []

        len_pcpline_pieces = len(pcpline_pieces)
        if len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'instant':
            self.msem = c_api.PM_SEM_INSTANT
        elif len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'counter':
            self.msem = c_api.PM_SEM_COUNTER
        elif len_pcpline_pieces >= 2 and pcpline_pieces[1] == 'discrete':
            self.msem = c_api.PM_SEM_DISCRETE
        elif self.typeline == 'counter' or 'total' in pieces or 'count' in pieces or 'sum' in pieces:
            self.msem = c_api.PM_SEM_COUNTER
        elif self.typeline == 'histogram' and 'bucket' in pieces:
            self.msem = c_api.PM_SEM_COUNTER
        else:
            self.msem = c_api.PM_SEM_INSTANT

        if len_pcpline_pieces >= 1:
            self.mname = 'prometheus.' + self.source.name + '.' + pcpline_pieces[0]
        else:
            self.mname = 'prometheus.' + self.source.name + '.' + self.name.replace(":", ".")

        # NB: ignore the PCP units suffix.
        # we could do:

        # self.munits = cpmapi.pmParseUnitsStr(pcpline_pieces[2])

        # ... but we'd have to undo the base-scale conversion done by pmwebd.  So
        # instead we use the inferred unit-scale (even though it doesn't match the
        # original pcp source).  We don't preserve PM_TYPE_* either.

        if ((self.typeline == 'histogram' or self.typeline == 'summary')
            and 'count' in pieces): # regardless of UNIT; *_sum peer is has proper type
            self.munits = pmUnits(0,0,1,0,0,0) # simple count
        elif self.typeline == 'histogram' and 'bucket' in pieces: # ditto
            self.munits = pmUnits(0,0,1,0,0,0) # simple count
        elif 'seconds' in pieces:
            self.munits = pmUnits(0,1,0,0,3,0)
        elif 'microseconds' in pieces: # not listed in https://prometheus.io/docs/practices/naming/
            self.munits = pmUnits(0,1,0,0,1,0)
        elif 'bytes' in pieces:
            self.munits = pmUnits(1,0,0,0,0,0)
        else:
            self.munits = pmUnits(0,0,0,0,0,0) # default, dimensionless old-school count

        assert self.munits and self.msem and self.mname and self.mtype


    def clear_values(self):
        ''' Erase all stored instance/value pairs, in anticipation of a new set. '''
        self.values.clear()


    def store_inst(self, labels, value):
        ''' Store given new instance/value pair. '''

        assert (labels is None) == (self.indom_table is None) # no metric indom flipflop
        if labels is None:
            self.values[c_api.PM_IN_NULL] = float(value)
        else:
            if self.pcpline: # pmwebd
                # NB: no quoting/transforms - preserve incoming value verbatim
                instname = labels['instance']
            else:
                self.indom_table.prefix_mode = True # Mark for instance# prefixed names
                instname = ""
                for key, val in sorted(labels.items()):
                    if len(instname) > 0:
                        instname += ' '
                    instname += key + ":" + val + ""
                    self.labels[key] = val
            inst = self.indom_table.intern_lookup_value(instname)

            self.values[inst] = float(value)
            self.inst_labels[inst] = labels
            if self.source.pmda.dbg:
                self.source.pmda.debug('store inst=%d instname="%s" value="%s"' % (inst, instname, value))
                self.source.pmda.debug('store inst_labels[%d] = %s' % (inst, labels))

    def save(self):
        if self.indom_table is not None:
            self.indom_table.save()

    def fetch_inst(self, inst): # fetch instance
        if not self.values:
            # Metric may have disappeared
            return [c_api.PM_ERR_AGAIN, 0]
        if inst in self.values:
            return [self.values[inst], 1]
        return [c_api.PM_ERR_INST, 0]


class PersistentNameTable(object):
    '''Persistent name table.  Answers name-to-number queries by assigning
    persistent ids.  Updates pmda's indom table intermittently.
    Persistent by saving dictionary in $PCP_VAR_DIR/pmdas/,
    similarly to how pmdaCache functions do.  A table may be flagged to
    add an instance-number prefix to all its instance-names, for ensuring
    uniqueness of the sort approved by pmLookupIndom(3).
    '''

    def __init__(self, pmda, indom, maxnum):
        self.pmda = pmda
        self.indom = indom
        self.maxnum = maxnum
        self.need_save = False
        self.prefix_mode = False # set later for non-PCP metric instances
        self.store_file_name = '%s/config/pmda/%d.%d.py' % (os.environ['PCP_VAR_DIR'],
                                    pmda.domain, indom)
        try: # slightly used!
            with open(self.store_file_name, 'rb') as f:
                self.instances = pickle.load(f, encoding="bytes")
                try:
                    # Fetch the prefixness of the mapping early, so we send
                    # the correct indom strings to the C code the first time.
                    self.prefix_mode = pickle.load(f, encoding="bytes")
                except:
                    pass
                self.pmda.debug("loaded %s%s, %d instances" %
                    (self.store_file_name,
                     (" (pfx)" if self.prefix_mode else ""),
                     len(self.instances))) if self.pmda.dbg else None
                self.need_save = True # to push values down into c pmda layer
                # XXX: need separate flag for 'push values to file'; too much i/o
        except Exception as e: # new!
            self.instances = [] # won't be saved till nonempty

        self.names_to_instances = {}
        for i, n in enumerate(self.instances):
            self.names_to_instances[n] = i
        # prime the pmda indom table with an empty list; the pmdaIndom
        # object will be replaced shortly within .replace_indom()
        self.pmdaindom = pmdaIndom(pmda.indom(self.indom), [])
        self.pmda.add_indom(self.pmdaindom)
        self.save() # replace it with real data now, if non-empty

    def save(self):
        '''Push values to the PMDA layer as well as the backing store file.'''
        if self.need_save:
            # save to the pmda C layer
            indom_array = [] # pmdaInstid array, computed on demand
            self.pmda.debug("indom %d:" % (self.pmda.indom(self.indom))) if self.pmda.dbg else None
            for i, n in enumerate(self.instances):
                if self.prefix_mode:
                    instname = str(i) + " " + str(n)
                else:
                    instname = str(n)
                self.pmda.debug("%4d:\t%s" % (i, instname)) if self.pmda.dbg else None
                indom_array.append(pmdaInstid(c_int(i), instname))

            # NB: use replace_indom(int,[...]) overload, to pass indom_array,
            # else it gets ignored
            self.pmda.replace_indom(self.pmda.indom(self.indom), indom_array)
            self.pmda.set_need_refresh()
            # save to disk too
            try: # slightly used!
                with open(self.store_file_name,'wb') as f:
                    pickle.dump(self.instances, f, protocol=0) # 0: most compatible
                    pickle.dump(self.prefix_mode, f, protocol=0)
                    self.pmda.debug("saved %s%s, %d instances" %
                        (self.store_file_name,
                         (" (pfx)" if self.prefix_mode else ""),
                         len(self.instances))) if self.pmda.dbg else None
                self.need_save = False # reset only on success
            except Exception as e:
                self.pmda.err("cannot save %s: %s" % (self.store_file_name, e))


    def intern_lookup_value(self, name):
        '''Add/lookup given name, return its persistent identifier.'''
        if name in self.names_to_instances: # fast path
            # mapping the translated name to inst
            return self.names_to_instances[name]
        else: # new name
            num = len(self.instances)
            if num > self.maxnum:
                raise ValueError('Too many (%d) different names' % num)
            self.instances.append(name)
            self.names_to_instances[name] = num
            assert self.instances[num] == name
            self.need_save = True
            return num # the new inst number


class SampleLineParser(object):
    '''A parser for one metric [{instance}] value [timestamp] line.
    State machine is required since we're lexing text with embedded
    quoted freeform strings with punctuation, etc., so can't
    simply do substring searching.
    '''

    def parse_metric_name_start(self, char):
        if char.isspace():
            pass
        else:
            self.state = self.parse_metric_name
            self.name = char

    def parse_metric_name(self, char):
        if char == '{':
            self.state = self.parse_label_name_start
            self.labels = {}
        elif char.isspace():
            self.state = self.parse_post_metric_name
        else:
            self.name += char

    def parse_post_metric_name(self, char):
        if char.isspace():
            pass
        elif char == '{':
            self.state = self.parse_label_name_start
            self.labels = {}
        else:
            self.state = self.parse_value
            self.value = char

    def parse_label_name_start(self,char):
        if char.isspace():
            pass
        elif char == '}':
            self.state = self.parse_post_labels
        else:
            self.state = self.parse_label_name
            self.lname = char

    def parse_label_name(self, char):
        if char.isspace():
            self.state = self.parse_label_equals
        elif char == '=':
            self.state = self.parse_label_value_start
        else:
            self.lname += char

    def parse_label_equals(self, char):
        if char.isspace():
            pass
        elif char == '=':
            self.state = self.parse_label_value_start
        else:
            raise ValueError("Expected =")

    def parse_label_value_start(self, char):
        if char.isspace():
            pass
        elif char == '"':
            self.state = self.parse_label_value
            self.lvalue = ""
        else:
            raise ValueError("Expected \"")

    def parse_label_value(self, char):
        if char == '\\':
            self.state = self.parse_label_value_escapechar
        elif char == '"':
            self.state = self.post_label_value
            self.labels[self.lname] = self.lvalue
            self.lname = None
            self.lvalue = None
        else:
            self.lvalue += char

    def parse_label_value_escapechar(self, char):
        # XXX: maybe pass through \-escaped identifiers literally through to pcp
        # otherwise we'd have to filter them out at indom-instance-name creation
        if char == '\\':
            self.state = self.parse_label_value
            self.lvalue += '\\'
        elif char == 'n':
            self.state = self.parse_label_value
            self.lvalue += '\n'
        elif char == '"':
            self.state = self.parse_label_value
            self.lvalue += '\"'
        else:
            self.state = self.parse_label_value
            self.lvalue += '\\' + char # transcribe \XYZ literally

    def post_label_value(self, char):
        if char.isspace():
            pass
        elif char == ',':
            self.state = self.parse_label_name_start
        elif char == '}':
            self.state = self.parse_post_labels
        else:
            raise ValueError("Expected , or }")

    def parse_post_labels(self, char):
        if char.isspace():
            pass
        else:
            self.state = self.parse_value
            self.value = char

    def parse_value(self, char):
        if char.isspace(): # timestamp possibly following
            self.state = self.parse_chomp
        else:
            self.value += char

    def parse_chomp(self, char): # ignored stuff
        pass

    def __init__(self, line):
        # mis-initialize output variables to force state
        # machine transitions to do it right
        self.name = None
        self.value = None
        self.labels = None
        self.lname = None
        self.lvalue = None

        # run state machine
        self.state = self.parse_metric_name_start
        for char in line:
            self.state(char)

        assert self.name
        assert self.value


class Source(object):
    '''An instance of this class represents a distinct Prometheus exporter,
    identified by a nickname (the next PMNS component beneath prometheus.*),
    and a URL.  Metrics will show up at prometheus.NICKNAME.MET.RIC as/when
    the source is online.
    '''

    def __init__(self, name, cluster, path, is_scripted, pmda):
        self.name = name # source nickname
        self.cluster = cluster # unique/persistent id# for nickname
        self.path = path # pathname to .url or executable file
        self.url = None
        self.parse_url_time = 0 # timestamp of config file when it was last parsed
        self.is_scripted = is_scripted
        self.pmda = pmda # the shared pmda
        self.requests = None
        self.headers = None
        self.filterlist = None
        self.document = None

        self.refresh_time = 0 # "never"
        if not is_scripted:
            # source is a URL.  Create a session for it.
            self.requests = self.pmda.requests # allow persistent connections etc.
            self.headers = {} # dict of headers for the http get
            self.filterlist = [] # list of filters

        # persistently assign numeric id to our metrics
        self.pmids_table = PersistentNameTable(self.pmda, cluster, MAX_METRIC)

        self.metrics_by_name = {} # name -> Metric
        self.metrics_by_num = {} # number (last component of pmid) -> Metric


    def old_enough_for_refresh(self):
        '''But what is "old"?  If it is empty (no metrics), then it
        has probably never been connected to successfully.

        OTOH, if it hasn't been fetched from "recently", there may
        be new metrics.  So we could track the last time a fetch was
        done to this source, and "time out" the PMNS from it.
        '''
        now = time.time()
        last_try_age = now - self.refresh_time
        return len(self.metrics_by_name) == 0 and last_try_age > empty_source_pmns_poll

    def check_filter(self, name, entrytype):
        '''
        return True if name of type entrytype ("METRIC" or "LABEL")
        is included (and not execluded) by filters for this source.
        First match prevails. No match => True
        '''
        if self.filterlist is None:
            return True
        for f in self.filterlist:
            fs = f.split() # INCLUDE|EXCLUDE METRIC|LABEL regex
            if fs[1] == entrytype:
                pat = ' '.join(fs[2:]) # regex may have spaces
                rx = self.pmda.lookup_regex(pat)
                self.pmda.debug("check_filter(%s) fs=%s regex='%s'" % (name,fs,pat)) if self.pmda.dbg else None
                if rx.match(name):
                    self.pmda.debug(".... rx.match(%s,%s) -> True" % (pat,name)) if self.pmda.dbg else None
                    return fs[0] == "INCLUDE"
        # no regex match => include
        return True

    def filter_labelset(self, labelset):
        ''' return labelset trimmed by filters '''
        included_labels = {}
        if labelset is None:
            return None # singular; nothing to include or exclude
        for lname, lval in labelset.items():
            if self.check_filter(lname, "LABEL"):
                included_labels[lname] = lval
        if included_labels == {}:
            return None # no labels => singular indom
        return included_labels

    def parse_metric_line(self, line, pcpline, helpline, typeline):
        '''
        Parse the sample line, identify/create corresponding metric & instance.
        '''
        try:
            sp = SampleLineParser(line)
            self.pmda.debug("parsed '%s' -> %s %s %s" % (line, sp.name, sp.labels, sp.value)) if self.pmda.dbg else None
            included_labels = self.filter_labelset(sp.labels)
            self.pmda.debug("included_labels '%s'" % (included_labels)) if self.pmda.dbg else None
            if sp.name in self.metrics_by_name:
                m = self.metrics_by_name[sp.name]
                assert self.metrics_by_num[m.metricnum] == m
                m.store_inst(included_labels, sp.value)
            else:
                # check metric is not excluded by filters
                fullname = "prometheus.%s.%s" % (self.name, sp.name)
                self.pmda.debug("Checking metric '%s'" % (fullname)) if self.pmda.dbg else None
                if not self.check_filter(fullname, "METRIC"):
                    self.pmda.log("Metric %s excluded by config filters" % fullname)
                    return
                else:
                    metricnum = self.pmids_table.intern_lookup_value(sp.name)
                    m = Metric(self,sp.name,metricnum,included_labels, pcpline,helpline,typeline)
                    self.metrics_by_name[sp.name] = m
                    self.metrics_by_num[metricnum] = m  # not pmid!
                    m.store_inst(included_labels, sp.value)
                    self.pmda.set_notify_change()
        except Exception as e:
            self.pmda.err("cannot parse/store %s: %s" % (line, e))
            traceback.print_exc() # traceback can be handy here


    def parse_lines(self, text):
        '''Refresh all the metric metadata as it is found, including creating
        new metrics.  Store away metric values for subsequent
        fetch()es.  Parse errors may result in exceptions.
        That's OK, we don't try heroics to parse non-compliant
        data.  Return number of metrics extracted.
        '''

        num_metrics = 0
        lines = text.splitlines()
        pcpline = None
        helpline = None
        typeline = None
        state = "metadata"
        for line in lines:
            self.pmda.debug("line: %s state: %s" % (line, state)) if self.pmda.dbg else None
            l = line.strip() # whitespace
            if l == "": # blank line, ignore, no state change
                continue
            elif l.startswith("#"): # comment
                if state == "metrics":
                    state = "metadata"
                    pcpline = None # NB: throw away previous block's metadata
                    helpline = None
                    typeline = None

                lp = l.split()
                if len(lp) < 2:
                    continue
                # NB: for a well-formed exporter file,
                # the # metadata blocks must precede
                # the metric values; we can ignore
                # lp[2].
                if lp[1] == 'PCP':
                    pcpline = ' '.join(lp[2:])
                elif lp[1] == 'HELP':
                    # assume lp[2] matches metric name
                    helpline = ' '.join(lp[3:])
                elif lp[1] == 'TYPE':
                    # assume lp[2] matches metric name
                    typeline = ' '.join(lp[3:])
                else:
                    pass # ignore other comment lines
            else: # metric{...} value line
                state = "metrics"
                # NB: could verify helpline/typeline lp[2] matches,
                # but we don't have to go out of our way to support
                # non-compliant exporters.
                self.parse_metric_line(l,pcpline,helpline,typeline)
                num_metrics += 1

        # NB: this logic only ever -adds- Metrics to a Source.  If a source
        # stops supplying some metrics, then a PCP app will see a PM_ERR_INST
        # coming back when it tries to fetch them.  We could perhaps keep the
        # set of -current- metrics fresh, i.e., track any metrics that were
        # in the Source but were not processed by any parse_metric_line() call.
        # Then we could remove the Metric, and thereby trigger a PM_ERR_PMID
        # for them.  In both cases though, we have no values.

        return num_metrics

    def parse_url_config(self, filepath):
        '''
        Parse a URL config file. The first line is always the URL.
        Remaining lines are prefixed with a keyword. Supported keywords
        include '#' for a comment, 'HEADER:' to add to the header passed
        to the headers dict parameter to the get() call. Note the ':' are
        important, and spaces are collapsed. e.g.

        # here is an example header
        http://someserver/someplace/endpoint.html
        HEADER: authtoken: some auth token

        # filters are used to include/exclude metric names
        FILTER: {INCLUDE|EXCLUDE} METRIC regex ...

        # filters are also used to include/exclude labels
        FILTER: {INCLUDE|EXCLUDE} LABEL regex ...
        '''

        conf = open(filepath, 'r').read().strip().split('\n')
        self.url = conf[0]
        self.filterlist = [] # filters matching metric names or labels

        for line in conf[1:]:
            if len(line) == 0 or line.startswith('#'):
                continue
            elif line.startswith('HEADER:'):
                header=line.split(':',2)
                key=''.join(header[1].split())
                val=' '.join(header[2:]).lstrip()
                self.headers[key] = val
            elif line.startswith('FILTER:'):
                # strip off 'FILTER:' and any leading space
                # each list entry is {INCLUDE|EXCLUDE} METRIC regex ... to end of line
                filter=''.join(line.split(':',2)[1]).lstrip()
                self.filterlist.append(filter)
            else:
                self.pmda.log('Warning: %s ignored unrecognised config entry "%s"' % (self.url,line))

        self.pmda.debug("DEBUG url: %s HEADERS: %s" % (self.url,self.headers)) if self.pmda.dbg else None
        self.pmda.debug("DEBUG url: %s FILTERS: %s" % (self.url,self.filterlist)) if self.pmda.dbg else None

        return

    def refresh1(self, timeout):
        '''
        If the Source config entry is a URL (with ".url" extension), find the
        target URL by reading the .url file, fetch (http GET) the target and
        then save resulting document.

        If the Source config entry is executable, run it, expecting prometheus
        formatted data on it's stdout, which is then saved.
        '''

        # clear cached values from all my metrics
        for n, m in self.metrics_by_name.items():
            m.clear_values()
        # XXX: ditch metrics no longer found in document

        # bump the fetch call counter for this source, and for the total (cluster 0)
        self.pmda.stats_fetch_calls[self.cluster] += 1
        self.pmda.stats_fetch_calls[0] += 1

        self.refresh_time = fetch_time = time.time()
        try:
            s = os.stat(self.path)
            if self.is_scripted:
                # check file still exists and is executable
                if not s[ST_MODE] & S_IXUSR:
                    self.pmda.err("cannot execute '%s': %s" % self.path)
                    return
            elif self.parse_url_time < s[ST_CTIME]:
                # (re)parse the URL from given file
                self.parse_url_config(self.path)
                self.parse_url_time = s[ST_CTIME]
        except Exception as e:
            self.pmda.err("cannot read %s: %s" % (self.path, e))
            return

        # fetch the document
        try:
            if self.is_scripted:
                # Execute file, expecting prometheus metric data on stdout.
                # stderr goes to the PMDA log.  Failures are caught below.
                self.document = subprocess.check_output(self.path, shell=False).decode()
            else:
                # fetch the URL
                if self.url.startswith('file://'):
                    self.document = open(self.url[7:],'r').read()
                else:
                    r = self.requests.get(self.url,headers=self.headers,timeout=timeout)
                    r.raise_for_status() # non-200?  ERROR
                    # NB: the requests package automatically enables http keep-alive and compression
                    self.document = r.text

            # update fetch time counter stats, in ms
            incr = int(1000 * (time.time() - fetch_time))
            self.pmda.stats_fetch_time[self.cluster] += incr
            self.pmda.stats_fetch_time[0] += incr # total for all sources

        except Exception as e:
            self.pmda.err('Warning: cannot fetch URL or execute script %s: %s' % (self.path, e))
            return

        
    def refresh2(self, timeout):
        '''
        Parse the saved document that was recently saved in refresh1().
        '''
        if self.document is None: # error during fetch?
            return
        
        # parse and handle the prometheus formatted metric data
        parse_time = time.time()
        s = self.parse_lines(self.document)

        # update parse time counter stats, in ms
        incr = int(1000 * (time.time() - parse_time))
        self.pmda.stats_parse_time[self.cluster] += incr
        self.pmda.stats_parse_time[0] += incr # total

        # save metric & indom lookup tables changes, if any
        for n,m in self.metrics_by_name.items():
            try: # NB: must process whole list even if exceptions escape
                m.save()
            except: # ... which they won't, this is just belt & suspenders
                pass
        self.pmids_table.save()
        
        self.pmda.debug("fetched %d bytes with %d metrics from URL or script %s" % (len(self.document), s, self.path)) if self.pmda.dbg else None
        self.document = None  # don't hang onto it
        

    def fetch(self, item, inst):
        ''' Retrieve metric/instance values that ought to have been found
        by a recent refresh().  PM_ERR_AGAIN signals a no go.'''

        try:
            m = self.metrics_by_num[item]
            return m.fetch_inst(inst)
        except Exception as e:
            self.pmda.log("Warning: cannot fetch item %d inst %d: %s" % (item,inst,e))
            return [c_api.PM_ERR_AGAIN, 0]

class PrometheusPMDA(PMDA):
    def __init__(self, pmda_name, domain, config, timeout, user, debugflag, logfile):
        '''
        Initialize the PMDA. This can take a while for large configurations.
        The prometheus entry in pmcd.conf specifies to start up in "notready"
        mode. Once startup and init is complete, we call pmda.pmda_ready()
        to tell PMCD we are ready to process requests, just prior to calling
        run() to enter the main loop. See below.
        '''

        # must first set user and connect to the invoking pmcd
        PMDA.__init__(self, pmda_name, domain, logfile)
        if user is not None:
            self.set_user(user)
            self.log('Note: running as user "%s"' % user)
        self.connect_pmcd()

        # Write debugging messages to log, see --debug cmdline option
        # and the storable metric $(pmda_name).control.debug
        self.dbg = debugflag

        # now everything else may take time
        self.pmda_name = pmda_name
        self.config_dir = os.path.normpath(config)
        self.config_dir_ctime = None
        self.timeout = timeout

        # a single central Session that all our sources can concurrently reuse
        self.requests = requests.Session() # allow persistent connections
        
        # the list of configured sources
        self.source_by_name = {}
        # persistently assign numeric id to source names -> pmid "cluster" numbers
        self.cluster_table = PersistentNameTable(self, 0, MAX_CLUSTER)
        reserved_cluster = self.cluster_table.intern_lookup_value("control")
        assert reserved_cluster == 0
        self.source_by_cluster = {}

        # compiled regex cache
        self.regex_cache = {}

        # Add a IS_DYNAMIC_ROOT metric that serves as a reminder to
        # pmcd to delegate all pmns requests to us.  Do it early, before
        # other metrics may populate beneath the $(pmda_name).* prefix.
        dynamic_pmid = (0 << 31) | (511 << 22) | (args.domain << 10) | 0
        dynamic_root = pmdaMetric(dynamic_pmid, 0, 0, 0, pmUnits())
        self.add_metric(self.pmda_name, dynamic_root, 'dynamic root for %s metrics' % self.pmda_name)

        # Add statistical metrics
        self.sources_indom = self.indom(0)

        # fetch call counter, per-source end-point
        self.stats_fetch_calls = {0:0} # counter, keyed by cluster number
        self.add_metric('%s.control.calls' % self.pmda_name, pmdaMetric(self.pmid(0, 1),
            c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
            pmUnits(0, 0, 1, 0, 0, c_api.PM_COUNT_ONE)),
            'per-end-point source call counter')

        # fetch time counter, per-source end-point
        self.stats_fetch_time = {0:0} # time counter in msec, keyed by cluster number
        self.add_metric('%s.control.fetch_time' % self.pmda_name, pmdaMetric(self.pmid(0, 2),
            c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
            pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter
            'per-end-point source fetch time counter, excluding parse time')

        # parse time counter, per-source end-point
        self.stats_parse_time = {0:0} # time counter in msec, keyed by cluster number
        self.add_metric('%s.control.parse_time' % self.pmda_name, pmdaMetric(self.pmid(0, 3),
            c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER,
            pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter
            'per-end-point source parse time counter, excluding fetch time')

        # verbose/debug messages metric
        self.add_metric('%s.control.debug' % self.pmda_name, pmdaMetric(self.pmid(0, 4),
            c_api.PM_TYPE_U32, c_api.PM_INDOM_NULL, c_api.PM_SEM_DISCRETE,
            pmUnits(0, 0, 0, 0, 0, 0)),
            'debug flag to enable verbose log messages, to enable: pmstore %s.control.debug 1' % self.pmda_name)

        # schedule a refresh
        self.set_need_refresh()

        # store callback for prometheus.control.debug
        self.set_store_callback(self.prom_store_callback)

        # used frequently, but we welcome it only for initial pmns population purposes
        # NB: If exceptions propagate out to cpmda, the log will contain entries such as:
        #     Error: fetch_callback: callback failed
        self.set_refresh_metrics(self.refresh_metrics_for_pmns)
        self.set_refresh_all(self.refresh_some_clusters_for_fetch) # "all" is a misnomer
        self.set_fetch_callback(self.fetch_callback)

        # label callbacks
        self.set_label(self.prom_label)
        self.set_label_callback(self.prom_label_callback)

    def lookup_regex(self, pat):
        ''' cache of compiled regex '''
        if pat not in self.regex_cache.keys():
            self.regex_cache[pat] = re.compile(r"%s" % pat)
            self.debug("lookup_regex: added '%s'" % pat) if self.dbg else None
        return self.regex_cache[pat]

    def assert_source_invariants(self, name=None, cluster=None):
        ''' Assert some invariants about the known sources '''
        if name:
            s = self.source_by_name[name]
            assert s == self.source_by_cluster[s.cluster]
        if cluster:
            s = self.source_by_cluster[cluster]
            assert s == self.source_by_name[s.name]


    def traverse(self, dir, ctime):
        ''' Return list of files below dir, recursively '''
        ret = []
        m = os.path.getctime(dir)
        if ctime is None or m > ctime:
            ctime = m

        for path, subdirs, files in os.walk(dir):
            for f in files:
                if not f.startswith("."):
                    fname = os.path.join(path, f)
                    m = os.path.getctime(fname)
                    if ctime is None or m > ctime:
                        ctime = m
                    ret.append(fname)
            for d in subdirs:
                m, r = self.traverse(os.path.join(path, d), ctime)
                if ctime is None or m > ctime:
                    ctime = m
        return ctime, ret

    def rescan_confdir(self):
        '''Scan the configuration directories for any new .url files
        or scripts.  Ensure there is a Source registered in the
        self.source_by_name dictionary for each one.

        First check if anything in the config dirs has changed lately,
        else do nothing.  This is important because this callback is
        invoked frequently by src/python/pmda.c.
        '''

        traverse_time = time.time()
        dir_ctime, conf_filelist = self.traverse(self.config_dir, self.config_dir_ctime)
        traverse_time = time.time() - traverse_time

        if self.config_dir_ctime is None or self.config_dir_ctime < dir_ctime:
            self.config_dir_ctime = dir_ctime
        else: # no new or changed conf files, don't rescan directory
            return

        self.log("Config change detected, traversed %d config entries in %.04fs, rescanning ..." % (len(conf_filelist), traverse_time))
        nickname_regexp = self.lookup_regex(r"^[A-Za-z][A-Za-z0-9_.]*$")

        # XXX: nuke sources related to removed files
        save_cluster_table = False
        if sort_conf_list:
            # sorted for indom cluster consistency
            conf_filelist = sorted(conf_filelist)
        for file in conf_filelist:
            # compute nickname for source:
            # the part of the filename before .url
            file_split = os.path.splitext(file)
            self.debug("found %s => %s" % (file, file_split)) if self.dbg else None

            # check if it's executable, or has ".url" suffix
            is_scripted = os.stat(file)[ST_MODE] & S_IXUSR != 0
            if not is_scripted and (len(file_split) != 2 or file_split[1] != ".url"):
                # ignore this file - not executable and doesn't end in ".url"
                self.debug("Warning: ignored config file '%s', doesn't end in '.url' and not executable." % file) if self.dbg else None
                continue
            # convert file path name into a PCP metric name
            name = file_split[0].replace(self.config_dir + "/", "").replace("/",".")
            if name == "control":
                self.err("Warning: ignored config file '%s', '%s.control' is a reserved PMNS subtree for PMDA statistics" % (file, self.pmda.pmda_name))
                continue
            if not nickname_regexp.match(name):
                self.err("Warning: ignored config file '%s', unsuitable for PCP namespace" % file)
                continue
            if name in self.source_by_name:
                # this source is already known
                self.assert_source_invariants(name=name)
                pass
            else:
                try:
                    path = file
                    cluster = self.cluster_table.intern_lookup_value(name)
                    source = Source(name, cluster, path, is_scripted, self)
                    self.source_by_name[source.name] = source
                    self.source_by_cluster[source.cluster] = source

                    # initialize statistics
                    self.stats_fetch_calls[cluster] = 0
                    self.stats_fetch_time[cluster] = 0
                    self.stats_parse_time[cluster] = 0

                    save_cluster_table = True
                    self.log("Found source %s cluster %d" % (name, cluster))
                except Exception as e:
                    self.err("Error allocating new cluster/source %s (%s)" % (name, e))
        if save_cluster_table:
            self.cluster_table.save()
            self.set_notify_change()

    def refresh_metrics_for_pmns(self):
        '''Refresh our list of Sources.  Then have each "old" Source do a
        fetch, so as to populate/refresh the PMNS.
        '''
        self.rescan_confdir() # get our Source list up to date

        # do a batch fetch of all empty sources to ensure pmns is populated
        clusters = []
        for k, v in self.source_by_cluster.items():
            if v.old_enough_for_refresh():
                clusters.append(k)
        if len(clusters) > 0:
            self.refresh_some_clusters_for_fetch(clusters)


    def fetch_callback(self, cluster, item, inst):
        ''' Main fetch callback which returns the value of the metric '''
        if cluster == 0: # The reserved 'control' cluster: statistics
            if item == 1: # per-source calls counter
                return [self.stats_fetch_calls[inst], 1] if inst in self.stats_fetch_calls else [c_api.PM_ERR_INST, 0]
            elif item == 2: # per-source fetch time counter
                return [self.stats_fetch_time[inst], 1] if inst in self.stats_fetch_time else [c_api.PM_ERR_INST, 0]
            elif item == 3: # per-source parse time counter
                return [self.stats_parse_time[inst], 1] if inst in self.stats_parse_time else [c_api.PM_ERR_INST, 0]
            elif item == 4: # $(pmda_name).control.debug
                return [self.dbg, 1]
            return [c_api.PM_ERR_PMID, 0]

        self.assert_source_invariants(cluster=cluster)
        try:
            if cluster in self.source_by_cluster: # end-points
                return self.source_by_cluster[cluster].fetch(item, inst)
            else:
                return [c_api.PM_ERR_PMID, 0]
        except Exception as e:
            return [c_api.PM_ERR_AGAIN, 0] # was there before

    def refresh1_worker(self, inqueue, outqueue):
        while not inqueue.empty():
            try:
                cluster = inqueue.get()
                if cluster > 0:
                        self.assert_source_invariants(cluster=cluster)
                        self.source_by_cluster[cluster].refresh1(self.timeout)
            except Exception as e:
                self.err("Error: Cannot refresh1 cluster %d: %s" % (cluster, e))
            finally:
                outqueue.put(cluster)

    def refresh2_worker(self, cluster):
        try:
            if cluster > 0:
                self.assert_source_invariants(cluster=cluster)
                self.source_by_cluster[cluster].refresh2(self.timeout)
        except Exception as e:
            self.err("Error: Cannot refresh2 cluster %d: %s" % (cluster, e))
            
    def refresh_some_clusters_for_fetch(self, _clusters):
        '''Called once per pmFetch batch handling, before
        prometheus_fetch_callback calls.  Creates threads to
        fetch data in parallel.
        '''
        clusters = [int(l) for l in _clusters] # convert from PyLong
        inpqueue = queue.Queue()
        workqueue = queue.Queue()
        self.debug("refreshing clusters %s" % clusters) if self.dbg else None
        for c in clusters:
            inpqueue.put(c)

        # We start up only a limited number of concurrent fetcher threads.  We don't
        # want to open an unlimited number of .url/.conf files nor sockets.
        num_threads = min(100,len(clusters))
        threads = []
        for i in range(num_threads):
            t = threading.Thread(target=self.refresh1_worker,
                                 args=(inpqueue,workqueue))
            threads.append(t)
            t.daemon = True # allow shutdown if some straggler is still running
            t.start()
            # Each will exit when the input workqueue becomes empty.

        # Consume the documents in the main thread, as they arrive in
        # the workqueue.  Do this single-threaded only because the
        # python vm is effectively single-threaded for computations
        # anyway, so cpu-bound multithreaded apps get bogged down by
        # the big interpreter lock.  The exception handler in
        # refresh1_worker() tries to guarantee that we will get all
        # the cluster numbers show up eventually.
        for c in clusters: # expected eventual size of workqueue
            cluster = workqueue.get()
            self.refresh2_worker(cluster)
        # XXX: timeout on this too?

    def set_need_refresh(self):
        cpmda.set_need_refresh()
        self.pmns_refresh()

    def prom_label(self, ident, type):
        '''
        return a JSONb formatted string (labelset) for given ident of given type
        '''

        self.debug('prom_label(ident=%d (%#x), type=%s)' % (ident, ident, labeltype[type])) if self.dbg else None
        if type == c_api.PM_LABEL_INDOM:
            # labels for indom=ident
            indom = ident
            for k, v in self.source_by_cluster.items():
                for m, mv in v.metrics_by_name.items():
                    if mv.mindom == indom:
                        return '{"source":"%s"}' % v.name
            return '{}' # no indom labels

        elif type == c_api.PM_LABEL_CLUSTER:
            # Labels for a cluster, i.e. an individual source.
            # Note: if using dbpmda, test this with e.g. label cluster 144.1
            # (you need the domain (default 144) and the cluster number (1, 2, ..)
            try:
                s = self.source_by_cluster[ident]
                if s.is_scripted:
                    # The script should insert a hostname="somewhere" label if it
                    # fetches from a remote host.  This will override the hostname
                    # label inserted by pmcd (at the context label level).  Details
                    # of the label hierarchy rules are in the "PRECEDENCE" section
                    # of the pmLookupLabels(3) man page.
                    return '{"script":"%s","source":"%s"}' % (s.path, s.name)
                else:
                    # extract hostname label from the URL
                    if s.url.startswith("http"):
                        host = s.url.replace("http://", "").replace("https://", "").split("/")[0]
                    elif s.url.startswith("file://"):
                        host = gethostname()
                    else:
                        host = "localhost"
                    if host == "localhost":
                        host = gethostname()
                    return '{"hostname":"%s","source":"%s","url":"%s"}' % (host, s.name, s.url)
            except:
                return '{}' # ignore key error for unknown cluster number

        else: # empty label set for all other types
            return '{}'

    def prom_label_callback(self, indom, inst):
        '''
        return label for given instance ID in given indom
        '''
        instlabels = ''
        try:
            self.debug('prom_label_callback(indom=%#x, inst=%d)' % (indom, inst)) if self.dbg else None
            for k, v in self.source_by_cluster.items():
                for m, mv in v.metrics_by_name.items():
                    if indom == mv.mindom:
                        for i, n in enumerate(mv.indom_table.instances):
                            if inst == i:
                                # found
                                self.debug('prom_label_callback: found inst label "%s"' % n) if self.dbg else None
                                # first add the instname label, which matches the external instance name
                                instlabels = '"instname": "%s %s"' % (i,n)
                                # and now each label for this instance separately
                                self.debug('prom_label_callback: i=%d mv.inst_labels[%d]=%s' % (i,i,mv.inst_labels[i]))
                                if i in mv.inst_labels:
                                    for l,v in mv.inst_labels[i].items():
                                        instlabels = instlabels + "," + '"%s":"%s"' % (l,v)
        except Exception as e:
            self.debug("prom_label_callback: exception %s" % e)
            self.debug(traceback.format_exc())
                
        return '{%s}' % instlabels

    def prom_store_callback(self, cluster, item, inst, val):
        '''
        To enable verbose log messages: pmstore prometheus.control.debug 1
        '''
        if cluster == 0 and item == 4:
            if val < 0:
                return c_api.PM_ERR_BADSTORE
            self.dbg = (val != 0)
            self.log('%s.control.debug: set to %d' % (self.pmda_name, self.dbg))
            return 0

        if cluster < MAX_CLUSTER and item < MAX_METRIC:
            return c_api.PM_ERR_PERMISSION
        return c_api_PM_ERR_PMID

    def debug(self, s):
        if self.dbg:
            super(PrometheusPMDA, self).log("debug: " + s)

    def log(self, s):
        super(PrometheusPMDA, self).log(s)

    def err(self, s):
        super(PrometheusPMDA, self).err(s)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Prometheus PMDA.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '-D', '--debug',
        default=False,
        help='enable debug messages in log (default disabled).',
        action='store_true')
    parser.add_argument(
        '-c', '--config',
        type=str,
        default='config.d',
        help='configuration directory')
    parser.add_argument(
        '-d', '--domain',
        type=int,
        default=144,
        help='PMDA domain number (default 144)')
    parser.add_argument(
        '-l', '--log',
        type=str,
        default='prometheus.log',
        help='log filename')
    parser.add_argument(
        '-n', '--nosort',
        default=False,
        help='do not sort the config file list (default is to sort)',
        action='store_true')
    parser.add_argument(
        '-r', '--root',
        type=str,
        default='prometheus',
        help='dynamic PMNS root name')
    parser.add_argument(
        '-t', '--timeout',
        type=int,
        default=2,
        help='HTTP GET timeout for each end-point URL (default 2 seconds)')
    parser.add_argument(
        '-u', '--user',
        type=str,
        default=None,
        help='set the username to run under (default is current user)')

    args = parser.parse_args()
    if args.nosort:
        sort_conf_list = False

    pmdadir = os.path.join(os.getenv('PCP_PMDAS_DIR'), args.root)
    if not args.config.startswith("/"):
        args.config = os.path.join(pmdadir, args.config)

    # This PMDA starts up in the "notready" state, see the Install script where
    # the IPC protocol is ipc_prot="binary notready". See also pmcd(1) man page.
    # The "binary notready" setting can also be manually configured in pmcd.conf.
    # Default domain number is PMDA(144), see -d option.
    pmda = PrometheusPMDA(args.root, args.domain, args.config, args.timeout, args.user, args.debug, args.log)

    # Scan initial config
    pmda.log("Initializing ... currently in notready state.")
    pmda.timeout = args.timeout * 10
    pmda.refresh_metrics_for_pmns()
    pmda.timeout = args.timeout

    # Tell PMCD that we are now ready to process requests.
    pmda.pmda_ready()
    pmda.log("Ready to process requests")

    # Now enter the main loop
    pmda.run()