#!/usr/bin/python3 import time import urllib3 import requests import json import os from prometheus_client import start_http_server, PROCESS_COLLECTOR, PLATFORM_COLLECTOR, GC_COLLECTOR from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY from prometheus_client.registry import Collector urllib3.disable_warnings() class CustomCollector(Collector): def collect(self): api_host = os.environ['OPNSENSE_EXPORTER_OPS_API'] api_key = os.environ['OPNSENSE_EXPORTER_OPS_API_KEY'] api_secret = os.environ['OPNSENSE_EXPORTER_OPS_API_SECRET'] result = requests.get(f'https://{api_host}/api/dhcpv4/leases/searchLease', verify=False, auth=(api_key, api_secret)) leases = {} for lease in result.json()['rows']: if lease['state'] == 'active' and lease['type'] == 'dynamic': leases.setdefault(lease['if_descr'], set()).add(lease['mac']) opnsense_dhcpv4_active_leases_count = GaugeMetricFamily('opnsense_dhcpv4_active_leases_count', 'Number of active DHCPv4 leases per interface', labels=['interface']) for if_descr, macs in leases.items(): opnsense_dhcpv4_active_leases_count.add_metric([if_descr], len(macs)) yield opnsense_dhcpv4_active_leases_count REGISTRY.unregister(PROCESS_COLLECTOR) REGISTRY.unregister(PLATFORM_COLLECTOR) REGISTRY.unregister(GC_COLLECTOR) REGISTRY.register(CustomCollector()) if __name__ == '__main__': start_http_server(addr='127.0.0.1', port=int(os.environ['OPNSENSE_DHCP_EXPORTER_PORT'])) while True: time.sleep(100)