Source code for simplematrixbotlib.api

from __future__ import annotations
import json
from nio import (AsyncClient, AsyncClientConfig)
from nio.exceptions import OlmUnverifiedDeviceError
from nio.responses import UploadResponse
import nio
from PIL import Image
import aiofiles.os
import mimetypes
import os
import markdown
import aiohttp
from typing import List, Tuple, Union
import re
import simplematrixbotlib


async def check_valid_homeserver(homeserver: str) -> bool:
    if not (homeserver.startswith('http://')
            or homeserver.startswith('https://')):
        return False

    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                    f'{homeserver}/_matrix/client/versions') as response:
                if response.status == 200:
                    return True
        except aiohttp.client_exceptions.ClientConnectorError:
            return False

    return False


def split_mxid(mxid: str) -> Union[Tuple[str, str], Tuple[None, None]]:
    # s = mxid.split(':')
    # if len(s) != 2 or s[0][0] != '@':
    #     return None, None
    # s[0] = s[0][1:]
    match = re.match(
        r'@(?P<localpart>[^:]*):(?P<hostname>(?P<ipv4>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?P<ipv6>\[[\da-fA-F:.]{2,45}\])|(?P<dns>[a-zA-Z\d\-.]{1,255}))(?P<port>:\d{1,5})?',
        mxid)
    if match is None:
        return None, None
    return match.group('localpart'), match.group('hostname')


[docs] class Api: """ A class to interact with the matrix-nio library. Usually used by the Bot class, and sparingly by the bot developer. ... Attributes ---------- creds : simplematrixbotlib.Creds config : simplematrixbotlib.Config async_client : simplematrixbotlib.AsyncClient """ def __init__(self, creds: simplematrixbotlib.Creds, config: simplematrixbotlib.Config): """ Initializes the simplematrixbotlib.Api class. Parameters ---------- creds : simplematrixbotlib.Creds config : simplematrixbotlib.Config """ self.creds = creds self.config = config self.async_client: AsyncClient = None
[docs] async def login(self): """ Login the client to the homeserver """ if not self.creds.homeserver: raise ValueError("Missing homeserver") if not self.creds.username: raise ValueError("Missing Username") if not (self.creds.password or self.creds.login_token or self.creds.access_token): raise ValueError( "Missing password, login token, access token. " "Either password, login token or access token must be provided" ) client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=self.config.encryption_enabled) store_path = self.config.store_path os.makedirs(store_path, mode=0o750, exist_ok=True) self.async_client = AsyncClient(homeserver=self.creds.homeserver, user=self.creds.username, device_id=self.creds.device_id, store_path=store_path, config=client_config) if self.creds.access_token: self.async_client.access_token = self.creds.access_token async with aiohttp.ClientSession() as session: async with session.get( f'{self.creds.homeserver}/_matrix/client/r0/account/whoami?access_token={self.creds.access_token}' ) as response: if isinstance(response, nio.responses.LoginError): raise Exception(response) r = json.loads( (await response.text()).replace(":false,", ":\"false\",")) # This assumes there was an error that needs to be communicated to the user. A key error happens in # the absence of an error code -> everything fine, we pass try: raise ConnectionError(f"{r['errcode']}: {r['error']}") except KeyError: pass device_id = r['device_id'] self.async_client.user_id, user_id = (r['user_id'], r['user_id']) if self.creds.username == split_mxid(user_id)[0]: # save full MXID self.creds.username = user_id elif user_id != self.creds.username: raise ValueError( f"Given Matrix ID (username) '{user_id}' does not match the access token. " "This error prevents you from accidentally using the wrong account. " "Resolve this by providing the correct username with your credentials, " f"or reset your session by deleting {self.creds._session_stored_file}" f"{' and ' + self.config.store_path if self.config.encryption_enabled else ''}." ) if device_id != self.creds.device_id: if self.config.encryption_enabled: if self.creds.device_id is not None: raise ValueError( f"Given device ID (session ID) '{device_id}' does not match the access token. " "This is critical, because it may break your verification status unintentionally. " "Fix this by providing the correct credentials matching the stored session " f"{self.creds._session_stored_file}.") else: print( "First run with access token. " "Saving device ID (session ID)...") self.creds.device_id, self.async_client.device_id = (device_id, device_id) self.creds.session_write_file() else: print( "Loaded device ID (session ID) does not match the access token. " "Recovering automatically...") self.creds.device_id, self.async_client.device_id = (device_id, device_id) self.creds.session_write_file() if self.config.encryption_enabled: self.async_client.load_store() else: if self.creds.password: resp = await self.async_client.login( password=self.creds.password, device_name=self.creds.device_name) elif self.creds.login_token: resp = await self.async_client.login( token=self.creds.login_token, device_name=self.creds.device_name) else: raise ValueError( "Can't log in: Missing access token, password, or login token" ) if isinstance(resp, nio.responses.LoginError): raise Exception(resp) self.creds.device_id = resp.device_id self.creds.access_token = resp.access_token if self.async_client.should_upload_keys: await self.async_client.keys_upload()
async def _send_room(self, room_id: str, content: dict, message_type: str = "m.room.message", ignore_unverified_devices: bool = None): """ Send a custom event in a Matrix room. Parameters ----------- room_id : str The room id of the destination of the message. content : dict The content block of the event to be sent. message_type : str, optional The type of event to send, default m.room.message. ignore_unverified_devices : bool, optional Whether to ignore that devices are not verified and send the message to them regardless on a per-message basis. """ try: await self.async_client.room_send( room_id=room_id, message_type=message_type, content=content, ignore_unverified_devices=ignore_unverified_devices or self.config.ignore_unverified_devices) except OlmUnverifiedDeviceError as e: # print(str(e)) print( "Message could not be sent. " "Set ignore_unverified_devices = True to allow sending to unverified devices." ) print("Automatically blacklisting the following devices:") for user in self.async_client.rooms[room_id].users: unverified: List[str] = list() for device_id, device in self.async_client.olm.device_store[ user].items(): if not (self.async_client.olm.is_device_verified(device) or self.async_client.olm.is_device_blacklisted(device) ): self.async_client.olm.blacklist_device(device) unverified.append(device_id) if len(unverified) > 0: print(f"\tUser {user}: {', '.join(unverified)}") await self.async_client.room_send( room_id=room_id, message_type=message_type, content=content, ignore_unverified_devices=ignore_unverified_devices or self.config.ignore_unverified_devices)
[docs] async def send_text_message(self, room_id: str, message: str, msgtype: str = "m.text", reply_to: str = ""): """ Send a text message in a Matrix room. Parameters ----------- room_id : str The room id of the destination of the message. message : str The content of the message to be sent. msgtype : str, optional The type of message to send: m.text (default), m.notice, etc reply_to : str, optional The event id for replying message. """ content = { "msgtype" : msgtype, "body" : message, } if reply_to != "": content['m.relates_to'] = { "m.in_reply_to" : { "event_id" : reply_to } } await self._send_room(room_id=room_id, content=content)
[docs] async def send_markdown_message(self, room_id: str, message, msgtype: str = "m.text"): """ Send a markdown message in a Matrix room. Parameters ----------- room_id : str The room id of the destination of the message. message : str The content of the message to be sent. msgtype : str, optional The type of message to send: m.text (default), m.notice, etc """ await self._send_room(room_id=room_id, content={ "msgtype": msgtype, "body": message, "format": "org.matrix.custom.html", "formatted_body": markdown.markdown(message, extensions=['fenced_code', 'nl2br']) })
[docs] async def send_reaction(self, room_id: str, event, key: str): """ Send a reaction to a message in a Matrix room. Parameters ---------- room_id : str The room id of the destination of the message. event : The event object you want to react to. key: str The content of the reaction. This is usually an emoji, but may technically be any text. """ await self._send_room( room_id=room_id, content={ "m.relates_to": { "event_id": event.event_id, "key": key, "rel_type": "m.annotation" } }, message_type="m.reaction" )
[docs] async def send_image_message(self, room_id: str, image_filepath: str): """ Send an image message in a Matrix room. Parameters ----------- room_id : str The room id of the destination of the message. image_filepath : str The path to the image on your machine. """ mime_type = mimetypes.guess_type(image_filepath)[0] image = Image.open(image_filepath) (width, height) = image.size file_stat = await aiofiles.os.stat(image_filepath) async with aiofiles.open(image_filepath, "r+b") as file: resp, maybe_keys = await self.async_client.upload( file, content_type=mime_type, filename=os.path.basename(image_filepath), filesize=file_stat.st_size) if isinstance(resp, UploadResponse): pass # Successful upload else: print(f"Failed Upload Response: {resp}") content = { "body": os.path.basename(image_filepath), "info": { "size": file_stat.st_size, "mimetype": mime_type, "thumbnail_info": None, "w": width, "h": height, "thumbnail_url": None }, "msgtype": "m.image", "url": resp.content_uri } try: await self._send_room(room_id=room_id, content=content) except: print(f"Failed to send image file {image_filepath}")
[docs] async def send_video_message(self, room_id: str, video_filepath: str): """ Send a video message in a Matrix room. Parameters ---------- room_id : str The room id of the destination of the message. video_filepath : str The path to the video on your machine. """ mime_type = mimetypes.guess_type(video_filepath)[0] file_stat = await aiofiles.os.stat(video_filepath) async with aiofiles.open(video_filepath, "r+b") as file: resp, maybe_keys = await self.async_client.upload( file, content_type=mime_type, filename=os.path.basename(video_filepath), filesize=file_stat.st_size) if isinstance(resp, UploadResponse): pass # Successful upload else: print(f"Failed Upload Response: {resp}") content = { "body": os.path.basename(video_filepath), "info": { "size": file_stat.st_size, "mimetype": mime_type, "thumbnail_info": None }, "msgtype": "m.video", "url": resp.content_uri } try: await self._send_room(room_id=room_id, content=content) except: print(f"Failed to send video file {video_filepath}")