#!/usr/bin/env python3
import argparse
import asyncio
import dataclasses
import logging
import aiohttp.web
import jinja2
import nio
import yaml
@dataclasses.dataclass
class Config:
matrix_homeserver: str
matrix_user: str
matrix_password: str
matrix_room: str
webhook_token: str
TEMPLATE_TEXT = (
"{% if status == 'resolved' %}"
"RÉSOLU "
"{% elif labels.severity == 'critical' %}"
"@room CRITIQUE "
"{% elif labels.severity == 'warning' %}"
"ATTENTION "
"{% endif %}"
"{{ labels.alertname }}"
"{% if labels.instance is defined %} {{ labels.instance }}{% endif %}"
"{% if annotations.summary is defined %}"
"\n{{ annotations.summary }}"
"{% else %}"
"{% for key, value in annotations.items() %}"
"\n{{ key }} : {{ value }}"
"{% endfor %}"
"{% endif %}"
)
TEMPLATE_HTML = (
"{% if status == 'resolved' %}"
"RÉSOLU "
"{% elif labels.severity == 'critical' %}"
"@room CRITIQUE "
"{% elif labels.severity == 'warning' %}"
"ATTENTION "
"{% endif %}"
"{{ labels.alertname }}"
"{% if labels.instance is defined %} {{ labels.instance }}{% endif %}"
"{% if annotations.summary is defined %}"
"
{{ annotations.summary }}" "{% else %}" "
" "{% for key, value in annotations.items() %}" "{{ key | capitalize }} : {{ value }}" "{% endif %}" ) async def format_messages(config, src, dest): env = jinja2.Environment() template_text = env.from_string(TEMPLATE_TEXT) template_html = env.from_string(TEMPLATE_HTML) while True: message = await src.get() for alert in message["alerts"]: await dest.put( ( template_text.render(**alert), template_html.render(**alert), ) ) async def send_notices(config, messages): client = nio.AsyncClient(config.matrix_homeserver, config.matrix_user) await client.login(config.matrix_password) while True: text, html = await messages.get() await client.room_send( config.matrix_room, message_type="m.room.message", content={ "msgtype": "m.text", "format": "org.matrix.custom.html", "body": text, "formatted_body": html, }, ) async def run_app(config, out, host, port): async def webhook(request): tokens = request.query.getall("token", []) if config.webhook_token not in tokens: logging.warning("Invalid tokens: %s", tokens) raise aiohttp.web.HTTPForbidden("Invalid tokens") message = await request.json() logging.debug("Incoming message: %s", message) await out.put(message) return aiohttp.web.Response() app = aiohttp.web.Application() app.add_routes([aiohttp.web.post("/webhook", webhook)]) runner = aiohttp.web.AppRunner(app) await runner.setup() await aiohttp.web.TCPSite(runner, host, port).start() async def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", default="config.yaml") parser.add_argument("--host", default="*") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() with open(args.config) as f: config = yaml.safe_load(f) config = Config(**config) messages = asyncio.Queue() formatted = asyncio.Queue() logging.info("Started Alertbot on %s:%d", args.host, args.port) tasks = ( asyncio.create_task(run_app(config, messages, args.host, args.port)), asyncio.create_task(format_messages(config, messages, formatted)), asyncio.create_task(send_notices(config, formatted)), ) # on propage les exceptions await asyncio.gather(*tasks, return_exceptions=False) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())
" "{% endfor %}" "" "