#!/usr/bin/env python3 import argparse import asyncio from dbus_next import InterfaceNotFoundError from dbus_next.aio import MessageBus from dbus_next.constants import BusType from dbus_next.introspection import Node from prometheus_client import CollectorRegistry, Enum, write_to_textfile def patch(introspect): xml = introspect.to_xml() name = xml.find( 'interface[@name="org.keepalived.Vrrp1.Instance"]' '/property[@name="Name"]' ) if name is not None: name.attrib["type"] = "(s)" return Node.from_xml(xml, is_root=True) async def traverse(bus, path="/org/keepalived/Vrrp1/Instance"): introspect = patch(await bus.introspect("org.keepalived.Vrrp1", path)) proxy = bus.get_proxy_object("org.keepalived.Vrrp1", path, introspect) try: interface = proxy.get_interface("org.keepalived.Vrrp1.Instance") _, state = await interface.get_state() (name,) = await interface.get_name() yield name, state except InterfaceNotFoundError: pass for path in proxy.child_paths: async for state in traverse(bus, path): yield state async def main(): parser = argparse.ArgumentParser() parser.add_argument("filename") args = parser.parse_args() bus = await MessageBus(bus_type=BusType.SYSTEM).connect() registry = CollectorRegistry() enum = Enum( "keepalived_vrrp_state", "Keepalived VRRP state", ["name"], registry=registry, states=("stop", "init", "backup", "master", "fault", "deleted"), ) async for name, state in traverse(bus): enum.labels(name=name).state(state.lower()) write_to_textfile(args.filename, registry) if __name__ == "__main__": asyncio.run(main())