matrix-appservice-kakaotalk/matrix_appservice_kakaotalk/commands/kakaotalk.py

207 lines
7.2 KiB
Python

# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING
import asyncio
from mautrix.bridge.commands import HelpSection, command_handler
from mautrix.util import utf16_surrogate
from mautrix.util.formatter import (
EntityString,
EntityType,
MarkdownString,
MatrixParser,
SimpleEntity,
)
from ..kt.types.api.struct import ApiUserType
from ..rpc.types import RPCError
from .. import puppet as pu, user as u
from .typehint import CommandEvent
from ..kt.client.errors import CommandException
SECTION_FRIENDS = HelpSection("Friends management", 40, "")
SECTION_CHANNELS = HelpSection("Channel management", 45, "")
if TYPE_CHECKING:
from mautrix.types import UserID
from ..kt.types.api.struct import FriendStruct
from ..kt.types.bson import Long
async def _get_search_result_puppet(source: u.User, friend_struct: FriendStruct) -> pu.Puppet:
puppet = await pu.Puppet.get_by_ktid(friend_struct.userId)
if not puppet.name_set:
await puppet.update_info_from_friend(source, friend_struct)
return puppet
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_FRIENDS,
help_text="List all KakaoTalk friends",
)
async def list_friends(evt: CommandEvent) -> None:
try:
resp = await evt.sender.client.list_friends()
await evt.mark_read()
except CommandException as e:
await evt.reply(f"Error while listing friends: {e!s}")
return
puppets = await asyncio.gather(
*[
_get_search_result_puppet(evt.sender, friend_struct)
for friend_struct in resp.friends if friend_struct.userType == ApiUserType.NORMAL
# NOTE Using NORMAL to avoid listing KakaoTalk bots, which are apparently PLUS users
]
)
results = "".join(
f"* [{puppet.name}](https://matrix.to/#/{puppet.default_mxid})\n" for puppet in puppets
)
if results:
await evt.reply(f"{results}")
else:
await evt.reply("No friends found.")
class MentionFormatString(EntityString[SimpleEntity, EntityType], MarkdownString):
def format(self, entity_type: EntityType, **kwargs) -> MentionFormatString:
if entity_type == EntityType.USER_MENTION:
self.entities.append(
SimpleEntity(
type=entity_type,
offset=0,
length=len(self.text),
extra_info={"user_id": kwargs["user_id"]},
)
)
return self
class MentionParser(MatrixParser[MentionFormatString]):
fs = MentionFormatString
async def _get_id_from_mxid(mxid: UserID) -> Long | None:
user = await u.User.get_by_mxid(mxid, create=False)
if user and user.ktid:
return user.ktid
puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
return puppet.ktid if puppet else None
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_FRIENDS,
help_text="Add a KakaoTalk user to your KakaoTalk friends list",
help_args="<_KakaoTalk ID_|_Matrix user ID_>",
)
async def add_friend(evt: CommandEvent) -> None:
await _edit_friend(evt, True)
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_FRIENDS,
help_text="Remove a KakaoTalk user from your KakaoTalk friends list",
help_args="<_KakaoTalk ID_|_Matrix user ID_>",
)
async def remove_friend(evt: CommandEvent) -> None:
await _edit_friend(evt, False)
async def _edit_friend(evt: CommandEvent, add: bool) -> None:
if not evt.args:
await evt.reply(f"**Usage:** `$cmdprefix+sp {evt.command} <KakaoTalk ID|Matrix user ID>`")
return
formatted_body = evt.content.get("formatted_body")
if formatted_body:
arg = formatted_body[len(evt.command):].strip()
parsed = await MentionParser().parse(utf16_surrogate.add(arg))
if not parsed.entities:
await evt.reply("No user found")
return
if (
len(parsed.entities) > 1 or
parsed.entities[0].offset != 0 or
parsed.entities[0].length != len(utf16_surrogate.remove(parsed.text))
):
await evt.reply("Can add only one friend at a time")
return
mxid = parsed.entities[0].extra_info["user_id"]
ktid = await _get_id_from_mxid(mxid)
if not ktid:
await evt.reply("No KakaoTalk user found for this Matrix ID")
else:
await _edit_friend_by_ktid(evt, ktid, add)
else:
arg = evt.content.body[len(evt.command):].strip()
ktid = await _get_id_from_mxid(arg)
if ktid:
await _edit_friend_by_ktid(evt, ktid, add)
else:
await _edit_friend_by_uuid(evt, arg, add)
async def _edit_friend_by_ktid(evt: CommandEvent, ktid: Long, add: bool) -> None:
try:
friend_struct = await evt.sender.client.edit_friend(ktid, add)
except RPCError as e:
await evt.reply(str(e))
else:
await _on_friend_edited(evt, friend_struct, add)
async def _edit_friend_by_uuid(evt: CommandEvent, uuid: str, add: bool) -> None:
try:
friend_struct = await evt.sender.client.edit_friend_by_uuid(uuid, add)
except RPCError as e:
await evt.reply(str(e))
except CommandException as e:
if e.status == -1002:
await evt.reply(
f"Failed to {'add' if add else 'remove'} friend. Ensure their ID is spelled correctly."
)
else:
raise
else:
await _on_friend_edited(evt, friend_struct, add)
async def _on_friend_edited(evt: CommandEvent, friend_struct: FriendStruct | None, add: bool):
await evt.reply(f"Friend {'added' if add else 'removed'}")
if friend_struct:
puppet = await pu.Puppet.get_by_ktid(friend_struct.userId)
await puppet.update_info_from_friend(evt.sender, friend_struct)
@command_handler(
needs_auth=True,
management_only=False,
help_section=SECTION_CHANNELS,
help_text="Leave this KakaoTalk channel",
)
async def leave(evt: CommandEvent) -> None:
if not evt.sender.is_connected:
await evt.reply("You are not connected to KakaoTalk chats")
return
if not evt.is_portal:
await evt.reply("This command may only be used in a KakaoTalk channel portal room")
return
await evt.mark_read()
await evt.sender.client.leave_channel(evt.portal.channel_props)
await evt.sender.on_channel_left(evt.portal.ktid, evt.portal.kt_type)