""" Format the alert message. """ import asyncio import dataclasses import json from typing import ( Any, NoReturn, Optional ) @dataclasses.dataclass class Message: body: str formated_body: Optional[str] def format_alert( alert: dict[str, Any] )->Message: """ Format an alert in json format to a nice string. """ body = json.dumps(alert, indent=4) formated_body = f"
{body}
\n"
return Message(body, formated_body)
async def format_alerts(
alert_queue: asyncio.Queue[dict[str,Any]],
message_queue: asyncio.Queue[Message]
)->NoReturn:
"""
Read alerts from alert_queue, format them, and put them in message_queue.
"""
while True:
alert = await alert_queue.get()
message = format_alert(alert)
await message_queue.put(message)
alert_queue.task_done()