# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. from __future__ import annotations from typing import TYPE_CHECKING, Awaitable import asyncio from mautrix.bridge.commands import HelpSection, command_handler from mautrix.types import Format, SerializerError from mautrix.util import utf16_surrogate from mautrix.util.formatter import ( EntityString, EntityType, MarkdownString, MatrixParser, SimpleEntity, ) from ..kt.types.api.struct import ApiUserType from ..rpc.types import RPCError from .. import puppet as pu, user as u from .typehint import CommandEvent from ..kt.client.errors import CommandException SECTION_ACCOUNT = HelpSection("Account management", 35, "") SECTION_FRIENDS = HelpSection("Friends management", 40, "") SECTION_CHANNELS = HelpSection("Channel management", 45, "") if TYPE_CHECKING: from mautrix.types import UserID from ..kt.types.api.struct import FriendStruct from ..kt.types.bson import Long @command_handler( needs_auth=True, management_only=True, help_section=SECTION_ACCOUNT, help_text="Retrieve your KakaoTalk account information", ) async def whoami(evt: CommandEvent) -> None: await evt.mark_read() try: own_info = await evt.sender.get_own_info(force=True) except SerializerError: evt.sender.log.exception("Failed to deserialize settings struct") own_info = None except CommandException as e: await evt.reply(f"Error from KakaoTalk: {e}") return if own_info: uuid = f"`{own_info.more.uuid}` ({'searchable' if own_info.more.uuidSearchable else 'hidden'})" if own_info.more.uuid else "_none_" await evt.reply( f"You're logged in as **{own_info.more.nickName}** (KakaoTalk ID: {uuid}, internal ID: `{evt.sender.ktid}`)" ) else: await evt.reply( f"You're logged in, but the bridge is unable to retrieve your profile information (internal ID: {evt.sender.ktid})" ) _CMD_CONFIRM_CHANGE_ID = "confirm-change-id" @command_handler( needs_auth=True, management_only=False, help_section=SECTION_ACCOUNT, help_text="Set or change your KakaoTalk ID", help_args="<_KakaoTalk ID_>", aliases=["set-id"], ) async def change_id(evt: CommandEvent) -> None: if len(evt.args) != 1: await evt.reply(f"**Usage:** `$cmdprefix+sp {evt.command} <KakaoTalk ID>`") return new_id = evt.args[0] if len(new_id) > 20: await evt.reply("ID must not exceed 20 characters. Please choose a shorter ID.") return await evt.mark_read() if await evt.sender.client.can_change_uuid(new_id): await evt.reply( "Once set, your KakaoTalk ID can be changed only once! " f"If you are sure that you want to change it, type `$cmdprefix+sp {_CMD_CONFIRM_CHANGE_ID}`. " "Otherwise, type `$cmdprefix+sp cancel`." ) evt.sender.command_status = { "action": "ID change", "room_id": evt.room_id, "next": _confirm_change_id, "new_id": new_id, } else: await evt.reply( f"Cannot change KakaoTalk ID to `{new_id}`. " "That ID might already be in use or have restricted characters. " "Either try a different ID, or try again later." ) async def _confirm_change_id(evt: CommandEvent) -> None: assert evt.sender.command_status new_id = evt.sender.command_status.pop("new_id") evt.sender.command_status = None await evt.mark_read() try: await evt.sender.client.change_uuid(new_id) except CommandException: await evt.reply(f"Failed to change KakaoTalk ID to `{new_id}`. Try again later.") else: await evt.reply(f"Successfully changed ID to `{new_id}`") @command_handler( needs_auth=True, management_only=False, help_section=SECTION_ACCOUNT, help_text="Allow others to search by your KakaoTalk ID", ) def make_id_searchable(evt: CommandEvent) -> Awaitable[None]: return _set_id_searchable(evt, True) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_ACCOUNT, help_text="Prevent others from searching by your KakaoTalk ID", ) def make_id_hidden(evt: CommandEvent) -> Awaitable[None]: return _set_id_searchable(evt, False) async def _set_id_searchable(evt: CommandEvent, searchable: bool) -> None: await evt.mark_read() try: await evt.sender.client.set_uuid_searchable(searchable) except RPCError as e: await evt.reply(str(e)) else: await evt.reply(f"Successfully made KakaoTalk ID {'searchable' if searchable else 'hidden'}") async def _get_search_result_puppet(source: u.User, friend_struct: FriendStruct) -> pu.Puppet: puppet = await pu.Puppet.get_by_ktid(friend_struct.userId) if not puppet.name_set: await puppet.update_info_from_friend(source, friend_struct) return puppet @command_handler( needs_auth=True, management_only=False, help_section=SECTION_FRIENDS, help_text="List all KakaoTalk friends", ) async def list_friends(evt: CommandEvent) -> None: await evt.mark_read() try: resp = await evt.sender.client.list_friends() except CommandException as e: await evt.reply(f"Error while listing friends: {e!s}") return puppets = await asyncio.gather( *[ _get_search_result_puppet(evt.sender, friend_struct) for friend_struct in resp.friends if friend_struct.userType == ApiUserType.NORMAL # NOTE Using NORMAL to avoid listing KakaoTalk bots, which are apparently PLUS users ] ) results = "".join( f"* [{puppet.name}](https://matrix.to/#/{puppet.default_mxid})\n" for puppet in puppets ) if results: await evt.reply(f"{results}") else: await evt.reply("No friends found.") class MentionFormatString(EntityString[SimpleEntity, EntityType], MarkdownString): def format(self, entity_type: EntityType, **kwargs) -> MentionFormatString: if entity_type == EntityType.USER_MENTION: self.entities.append( SimpleEntity( type=entity_type, offset=0, length=len(self.text), extra_info={"user_id": kwargs["user_id"]}, ) ) return self class MentionParser(MatrixParser[MentionFormatString]): fs = MentionFormatString async def _get_id_from_mxid(mxid: UserID) -> Long | None: user = await u.User.get_by_mxid(mxid, create=False) if user and user.ktid: return user.ktid puppet = await pu.Puppet.get_by_mxid(mxid, create=False) return puppet.ktid if puppet else None @command_handler( needs_auth=True, management_only=False, help_section=SECTION_FRIENDS, help_text="Add a KakaoTalk user to your KakaoTalk friends list", help_args="<_KakaoTalk ID_|_Matrix user ID_>", ) def add_friend(evt: CommandEvent) -> Awaitable[None]: return _edit_friend(evt, True) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_FRIENDS, help_text="Remove a KakaoTalk user from your KakaoTalk friends list", help_args="<_KakaoTalk ID_|_Matrix user ID_>", ) def remove_friend(evt: CommandEvent) -> Awaitable[None]: return _edit_friend(evt, False) async def _edit_friend(evt: CommandEvent, add: bool) -> None: if not evt.args: await evt.reply(f"**Usage:** `$cmdprefix+sp {evt.command} <KakaoTalk ID|Matrix user ID>`") return if evt.content.get("format", None) == Format.HTML and evt.content["formatted_body"]: parsed = await MentionParser().parse(utf16_surrogate.add( evt.content["formatted_body"][len(evt.command):].strip() )) if not parsed.entities: await evt.reply("No user found") return if ( len(parsed.entities) > 1 or parsed.entities[0].offset != 0 or parsed.entities[0].length != len(utf16_surrogate.remove(parsed.text)) ): await evt.reply("Can add only one friend at a time") return mxid = parsed.entities[0].extra_info["user_id"] ktid = await _get_id_from_mxid(mxid) if not ktid: await evt.reply("No KakaoTalk user found for this Matrix ID") else: await _edit_friend_by_ktid(evt, ktid, add) else: arg = evt.content.body[len(evt.command):].strip() ktid = await _get_id_from_mxid(arg) if ktid: await _edit_friend_by_ktid(evt, ktid, add) else: await _edit_friend_by_uuid(evt, arg, add) async def _edit_friend_by_ktid(evt: CommandEvent, ktid: Long, add: bool) -> None: await evt.mark_read() try: friend_struct = await evt.sender.client.edit_friend(ktid, add) except RPCError as e: await evt.reply(str(e)) else: await _on_friend_edited(evt, friend_struct, add) async def _edit_friend_by_uuid(evt: CommandEvent, uuid: str, add: bool) -> None: await evt.mark_read() try: friend_struct = await evt.sender.client.edit_friend_by_uuid(uuid, add) except RPCError as e: await evt.reply(str(e)) except CommandException as e: if e.status == -1002: await evt.reply( f"Failed to {'add' if add else 'remove'} friend. Ensure their ID is spelled correctly." ) else: raise else: await _on_friend_edited(evt, friend_struct, add) async def _on_friend_edited(evt: CommandEvent, friend_struct: FriendStruct | None, add: bool): await evt.reply(f"Friend {'added' if add else 'removed'}") if friend_struct: puppet = await pu.Puppet.get_by_ktid(friend_struct.userId) await puppet.update_info_from_friend(evt.sender, friend_struct) @command_handler( needs_auth=True, management_only=False, help_section=SECTION_CHANNELS, help_text="Leave this KakaoTalk channel", ) async def leave(evt: CommandEvent) -> None: if not evt.sender.is_connected: await evt.reply("You are not connected to KakaoTalk chats") return if not evt.is_portal: await evt.reply("This command may only be used in a KakaoTalk channel portal room") return await evt.mark_read() await evt.portal.leave_kakaotalk_channel()