from __future__ import annotations """ The Client class. Connect to the matrix server and handle interactions with the server. """ import asyncio import logging import nio from aiopath import AsyncPath from typing import ( Any, Awaitable, Callable, Optional, NoReturn, Union ) from .async_utils import Aobject from .invite_policy import ( DeclineAll, InvitePolicy, WhiteList ) from .utils import ( Room, RoomAlias, RoomId ) log = logging.getLogger(__name__) class Client(Aobject): """ Connect to the matrix server and handle interactions with the server. user: the user id of the client (exp: @bot:matrix.org) /!\ The client is initialized asyncronously: `client = await Client(...)` """ __client: nio.AsyncClient __rooms_by_aliases: dict[RoomAlias, Room] __rooms_by_id: dict[RoomId, Room] __sync_token_file: AsyncPath __sync_token:Optional[str] __sync_token_queue: asyncio.Queue[str] __invite_queue: asyncio.Queue[tuple[RoomId, nio.responses.InviteInfo]] __invite_policy: InvitePolicy user: str async def __init__( self, username: str, homeserver: str, password: str, invite_policy: Optional[InvitePolicy]=None, sync_token_file:str="./.sync_token", ): """ Initialize the Client. username: the username used by the bot homeserver: the matrix home server of the bot (expl: "https://matrix.org") password: the password of the user invite_policy: the policy to apply to invitation, default is to decline all. sync_token_file: the file where is stored the last sync token received from the sync responses. This token avoid reloadind all the history of the bot each time we start it. """ self.__client = nio.AsyncClient( homeserver, username ) self.__rooms_by_aliases = {} self.__rooms_by_id = {} self.__sync_token_file = AsyncPath(sync_token_file) if (await self.__sync_token_file.is_file()): async with self.__sync_token_file.open() as file: self.__sync_token = (await file.read()).strip() else: self.__sync_token = None self.__sync_token_queue = asyncio.Queue() self.__invite_queue = asyncio.Queue() self.__invite_policy = invite_policy or DeclineAll() resp = await self.__client.login(password) if isinstance(resp, nio.responses.LoginError): raise RuntimeError(f"Fail to connect: {resp.message}") log.info("logged in") self.user_id = self.__client.user_id async def resolve_room( self, room_name: Union[RoomAlias, RoomId] )->Room: """ Lookup a room from its id or alias. If the name has already been resolved by this client, the room is return directly without querying the server. """ # If the room_name is empty: if len(room_name) == 0: raise ValueError(f"Invalid room_name: {room_name}") # If it is a known room id: if room_name[0] == '!' and room_name in self.__rooms_by_id: return self.__rooms_by_id[room_name] # If it is a unknown room id: elif room_name[0] == '!': room = Room(id=room_name) self.__rooms_by_id[room_name] = room return room # If it is not a room id nor a room alias: elif room_name[0] != '#': raise ValueError(f"Invalid room_name: {room_name}") # If it is a known room alias: elif room_name in self.__rooms_by_aliases: return self.__rooms_by_aliases[room_name] # If it is an unknown room alias: else: resp = await self.__client.room_resolve_alias(room_name) if isinstance(resp, nio.responses.RoomResolveAliasError): raise RuntimeError(f"Error while resolving alias: {resp.message}") # If the room is already known: if resp.room_id in self.__rooms_by_id: room = self.__rooms_by_id[resp.room_id] room.aliases.add(room_name) # If the room is unknwon: else: room = Room(id=resp.room_id,aliases={room_name}) self.__rooms_by_id[resp.room_id] = room self.__rooms_by_aliases[room_name] = room return room def set_invite_policy( self, invite_policy: InvitePolicy ): """ Set the invite policy. """ self.__invite_policy = invite_policy async def send_message( self, room: Union[RoomAlias, RoomId], message: str, ): msg = { "body": message, "msgtype": "m.text" } await self.__send_message(room, msg) async def send_formated_message( self, room: Union[RoomAlias, RoomId], message: str, unformated_message: Optional[str]=None ): msg = { "msgtype": "m.text", "format": "org.matrix.custom.html", "formatted_body": message, "body": unformated_message or message, } await self.__send_message(room, msg) async def send_firework_message( self, room: Union[RoomAlias, RoomId], message: str, ): """ Send message with the firework effect. """ msg = { "body": message, "msgtype": "nic.custom.fireworks" } await self.__send_message(room, msg) async def send_confetti_message( self, room: Union[RoomAlias, RoomId], message: str, ): """ Send message with the confetti effect. """ msg = { "body": message, "msgtype": "nic.custom.confetti" } await self.__send_message(room, msg) async def send_snow_message( self, room: Union[RoomAlias, RoomId], message: str, ): """ Send message with the snowfall effect. """ msg = { "body": message, "msgtype": "io.element.effect.snowfall" } await self.__send_message(room, msg) async def send_space_invader_message( self, room: Union[RoomAlias, RoomId], message: str, ): """ Send message with the space invader effect. """ msg = { "body": message, "msgtype": "io.element.effects.space_invaders" } await self.__send_message(room, msg) async def __save_sync_tocken(self)->NoReturn: """ Save the sync token. """ while True: token = await self.__sync_token_queue.get() async with self.__sync_token_file.open(mode='w') as file: await file.write(token) self.__sync_token_queue.task_done() async def __process_invites(self)->NoReturn: """ Process invites to rooms. """ while True: room_id, invite_info = await self.__invite_queue.get() accept_invite = False if (await self.__invite_policy.accept_invite(room_id, invite_info)): result = await self.__client.join(room_id) if isinstance(result, nio.JoinError): log.warning(f"Error while joinning room {room_id}: {result.message}") else: log.info(f"{room_id} joined") else: result = await self.__client.room_leave(room_id) if isinstance(result, nio.RoomLeaveError): log.warning(f"Error while leaving room {room_id}: {result.message}") else: log.info(f"{room_id} left") self.__invite_queue.task_done() async def __send_message( self, room_name: Union[RoomAlias, RoomId], msg: dict[Any, Any] ): """ Send a message, take a room name and the dict of the message. """ room = await self.resolve_room(room_name) await self.__client.room_send(room.id, "m.room.message", msg) log.debug(f"message sent to room {room.id}") async def __sync( self, sync_timeout:int=30 )->NoReturn: """ Sync with the server every sync_delta seconds. sync_timeout: the max time in sec between each sync. """ while True: sync_resp = await self.__client.sync( sync_timeout*1000, since=self.__sync_token ) if isinstance(sync_resp, nio.responses.SyncError): log.warning(f"Error while syncronizing: {sync_resp.message}") continue self.__sync_token = sync_resp.next_batch await self.__sync_token_queue.put(self.__sync_token) invites = sync_resp.rooms.invite for room_id in invites: await self.__invite_queue.put((room_id, invites[room_id])) def add_message_callback( self, callback: Callable[[Client, nio.rooms.Room, nio.events.room_events.RoomMessageText], Awaitable[None]] ): """ Add a callback called when a message is received. The callback is an async function that take the client, the room and the event message in arg. """ async def new_callback(room: nio.rooms.Room, msg: nio.events.room_events.RoomMessageText): await callback(self, room, msg) self.__client.add_event_callback(new_callback, nio.RoomMessageText) async def run( self, sync_timeout:int=30 )->NoReturn: """ Run the bot: sync with the server and execute callbacks. sync_timeout: the max time in sec between each sync. """ await asyncio.gather( self.__sync(sync_timeout=sync_timeout), self.__save_sync_tocken(), self.__process_invites() )