""" Format the alert message. """ import asyncio import dataclasses import json from typing import ( Any, NoReturn, Optional ) from jinja2 import Environment, BaseLoader template_raw = ( "{% if alert['labels']['severity'] == 'critical' and alert['status'] == 'firing' %}" "@room" "{% elif alert['labels']['severity'] == 'warning' and alert['status'] == 'firing' %}" "" "{% elif alert['status'] == 'resolved' %}" " End of alert " "{% else %}" "" "{% endif %}" "{{ alert['labels']['alertname'] }}: {{ alert['labels']['instance'] }}
" "
" "{{ alert['annotations']['title'] }}
" "{{ alert['annotations']['description'] }}
" ) template = Environment(loader=BaseLoader).from_string(template_raw) @dataclasses.dataclass class Message: body: str formated_body: Optional[str] def format_alert( alert_json: dict[str, Any] )->list[Message]: """ Format an alert in json format to a nice string. """ messages = [] for alert in alert_json['alerts']: formated_body = template.render(alert=alert, status=alert_json['status']) body = f"{ alert['annotations']['title'] }:\n{ alert['annotations']['description'] }" if '@room' in formated_body: body = '@room ' + body formated_body = formated_body.replace('@room', '') messages.append(Message(body, formated_body)) return messages async def format_alerts( alert_queue: asyncio.Queue[dict[str,Any]], message_queue: asyncio.Queue[Message] )->NoReturn: """ Read alerts from alert_queue, format them, and put them in message_queue. """ while True: alert = await alert_queue.get() messages = format_alert(alert) for message in messages: await message_queue.put(message) alert_queue.task_done()