322 lines
11 KiB
Python
322 lines
11 KiB
Python
# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
|
|
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Awaitable
|
|
import asyncio
|
|
|
|
from mautrix.bridge.commands import HelpSection, command_handler
|
|
from mautrix.types import Format, SerializerError
|
|
from mautrix.util import utf16_surrogate
|
|
from mautrix.util.formatter import (
|
|
EntityString,
|
|
EntityType,
|
|
MarkdownString,
|
|
MatrixParser,
|
|
SimpleEntity,
|
|
)
|
|
|
|
from ..kt.types.api.struct import ApiUserType
|
|
from ..rpc.types import RPCError
|
|
|
|
from .. import puppet as pu, user as u
|
|
from .typehint import CommandEvent
|
|
|
|
from ..kt.client.errors import CommandException
|
|
|
|
SECTION_ACCOUNT = HelpSection("Account management", 35, "")
|
|
SECTION_FRIENDS = HelpSection("Friends management", 40, "")
|
|
SECTION_CHANNELS = HelpSection("Channel management", 45, "")
|
|
|
|
if TYPE_CHECKING:
|
|
from mautrix.types import UserID
|
|
|
|
from ..kt.types.api.struct import FriendStruct
|
|
from ..kt.types.bson import Long
|
|
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=True,
|
|
help_section=SECTION_ACCOUNT,
|
|
help_text="Retrieve your KakaoTalk account information",
|
|
)
|
|
async def whoami(evt: CommandEvent) -> None:
|
|
await evt.mark_read()
|
|
try:
|
|
own_info = await evt.sender.get_own_info(force=True)
|
|
except SerializerError:
|
|
evt.sender.log.exception("Failed to deserialize settings struct")
|
|
own_info = None
|
|
except CommandException as e:
|
|
await evt.reply(f"Error from KakaoTalk: {e}")
|
|
return
|
|
if own_info:
|
|
puppet = await pu.Puppet.get_by_ktid(evt.sender.ktid)
|
|
uuid = f"`{own_info.more.uuid}` ({'searchable' if own_info.more.uuidSearchable else 'hidden'})" if own_info.more.uuid else "_none_"
|
|
await evt.reply(
|
|
f"You're logged in as **{own_info.more.nickName}**"
|
|
f"\n* KakaoTalk ID: {uuid}"
|
|
f"\n* Internal ID: `{evt.sender.ktid}`"
|
|
f"\n* Matrix user: [{puppet.name}](https://matrix.to/#/{puppet.default_mxid})"
|
|
)
|
|
else:
|
|
await evt.reply(
|
|
f"You're logged in, but the bridge is unable to retrieve your profile information (internal ID: {evt.sender.ktid})"
|
|
)
|
|
|
|
|
|
_CMD_CONFIRM_CHANGE_ID = "confirm-change-id"
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_ACCOUNT,
|
|
help_text="Set or change your KakaoTalk ID",
|
|
help_args="<_KakaoTalk ID_>",
|
|
aliases=["set-id"],
|
|
)
|
|
async def change_id(evt: CommandEvent) -> None:
|
|
if len(evt.args) != 1:
|
|
await evt.reply(f"**Usage:** `$cmdprefix+sp {evt.command} <KakaoTalk ID>`")
|
|
return
|
|
new_id = evt.args[0]
|
|
if len(new_id) > 20:
|
|
await evt.reply("ID must not exceed 20 characters. Please choose a shorter ID.")
|
|
return
|
|
await evt.mark_read()
|
|
if await evt.sender.client.can_change_uuid(new_id):
|
|
await evt.reply(
|
|
"Once set, your KakaoTalk ID can be changed only once! "
|
|
f"If you are sure that you want to change it, type `$cmdprefix+sp {_CMD_CONFIRM_CHANGE_ID}`. "
|
|
"Otherwise, type `$cmdprefix+sp cancel`."
|
|
)
|
|
evt.sender.command_status = {
|
|
"action": "ID change",
|
|
"room_id": evt.room_id,
|
|
"next": _confirm_change_id,
|
|
"new_id": new_id,
|
|
}
|
|
else:
|
|
await evt.reply(
|
|
f"Cannot change KakaoTalk ID to `{new_id}`. "
|
|
"That ID might already be in use or have restricted characters. "
|
|
"Either try a different ID, or try again later."
|
|
)
|
|
|
|
async def _confirm_change_id(evt: CommandEvent) -> None:
|
|
assert evt.sender.command_status
|
|
new_id = evt.sender.command_status.pop("new_id")
|
|
evt.sender.command_status = None
|
|
await evt.mark_read()
|
|
try:
|
|
await evt.sender.client.change_uuid(new_id)
|
|
except CommandException:
|
|
await evt.reply(f"Failed to change KakaoTalk ID to `{new_id}`. Try again later.")
|
|
else:
|
|
await evt.reply(f"Successfully changed ID to `{new_id}`")
|
|
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_ACCOUNT,
|
|
help_text="Allow others to search by your KakaoTalk ID",
|
|
)
|
|
def make_id_searchable(evt: CommandEvent) -> Awaitable[None]:
|
|
return _set_id_searchable(evt, True)
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_ACCOUNT,
|
|
help_text="Prevent others from searching by your KakaoTalk ID",
|
|
)
|
|
def make_id_hidden(evt: CommandEvent) -> Awaitable[None]:
|
|
return _set_id_searchable(evt, False)
|
|
|
|
async def _set_id_searchable(evt: CommandEvent, searchable: bool) -> None:
|
|
await evt.mark_read()
|
|
try:
|
|
await evt.sender.client.set_uuid_searchable(searchable)
|
|
except RPCError as e:
|
|
await evt.reply(str(e))
|
|
else:
|
|
await evt.reply(f"Successfully made KakaoTalk ID {'searchable' if searchable else 'hidden'}")
|
|
|
|
async def _get_search_result_puppet(source: u.User, friend_struct: FriendStruct) -> pu.Puppet:
|
|
puppet = await pu.Puppet.get_by_ktid(friend_struct.userId)
|
|
if not puppet.name_set:
|
|
await puppet.update_info_from_friend(source, friend_struct)
|
|
return puppet
|
|
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_FRIENDS,
|
|
help_text="List all KakaoTalk friends",
|
|
)
|
|
async def list_friends(evt: CommandEvent) -> None:
|
|
await evt.mark_read()
|
|
try:
|
|
resp = await evt.sender.client.list_friends()
|
|
except CommandException as e:
|
|
await evt.reply(f"Error while listing friends: {e!s}")
|
|
return
|
|
puppets = await asyncio.gather(
|
|
*[
|
|
_get_search_result_puppet(evt.sender, friend_struct)
|
|
for friend_struct in resp.friends if friend_struct.userType == ApiUserType.NORMAL
|
|
# NOTE Using NORMAL to avoid listing KakaoTalk bots, which are apparently PLUS users
|
|
]
|
|
)
|
|
results = "".join(
|
|
f"* [{puppet.name}](https://matrix.to/#/{puppet.default_mxid})\n" for puppet in puppets
|
|
)
|
|
if results:
|
|
await evt.reply(f"{results}")
|
|
else:
|
|
await evt.reply("No friends found.")
|
|
|
|
|
|
class MentionFormatString(EntityString[SimpleEntity, EntityType], MarkdownString):
|
|
def format(self, entity_type: EntityType, **kwargs) -> MentionFormatString:
|
|
if entity_type == EntityType.USER_MENTION:
|
|
self.entities.append(
|
|
SimpleEntity(
|
|
type=entity_type,
|
|
offset=0,
|
|
length=len(self.text),
|
|
extra_info={"user_id": kwargs["user_id"]},
|
|
)
|
|
)
|
|
return self
|
|
|
|
class MentionParser(MatrixParser[MentionFormatString]):
|
|
fs = MentionFormatString
|
|
|
|
async def _get_id_from_mxid(mxid: UserID) -> Long | None:
|
|
user = await u.User.get_by_mxid(mxid, create=False)
|
|
if user and user.ktid:
|
|
return user.ktid
|
|
puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
|
|
return puppet.ktid if puppet else None
|
|
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_FRIENDS,
|
|
help_text="Add a KakaoTalk user to your KakaoTalk friends list",
|
|
help_args="<_KakaoTalk ID_|_Matrix user ID_>",
|
|
)
|
|
def add_friend(evt: CommandEvent) -> Awaitable[None]:
|
|
return _edit_friend(evt, True)
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_FRIENDS,
|
|
help_text="Remove a KakaoTalk user from your KakaoTalk friends list",
|
|
help_args="<_KakaoTalk ID_|_Matrix user ID_>",
|
|
)
|
|
def remove_friend(evt: CommandEvent) -> Awaitable[None]:
|
|
return _edit_friend(evt, False)
|
|
|
|
async def _edit_friend(evt: CommandEvent, add: bool) -> None:
|
|
if not evt.args:
|
|
await evt.reply(f"**Usage:** `$cmdprefix+sp {evt.command} <KakaoTalk ID|Matrix user ID>`")
|
|
return
|
|
if evt.content.get("format", None) == Format.HTML and evt.content["formatted_body"]:
|
|
parsed = await MentionParser().parse(utf16_surrogate.add(
|
|
evt.content["formatted_body"][len(evt.command):].strip()
|
|
))
|
|
if not parsed.entities:
|
|
await evt.reply("No user found")
|
|
return
|
|
if (
|
|
len(parsed.entities) > 1 or
|
|
parsed.entities[0].offset != 0 or
|
|
parsed.entities[0].length != len(utf16_surrogate.remove(parsed.text))
|
|
):
|
|
await evt.reply("Can add only one friend at a time")
|
|
return
|
|
mxid = parsed.entities[0].extra_info["user_id"]
|
|
ktid = await _get_id_from_mxid(mxid)
|
|
if not ktid:
|
|
await evt.reply("No KakaoTalk user found for this Matrix ID")
|
|
else:
|
|
await _edit_friend_by_ktid(evt, ktid, add)
|
|
else:
|
|
arg = evt.content.body[len(evt.command):].strip()
|
|
ktid = await _get_id_from_mxid(arg)
|
|
if ktid:
|
|
await _edit_friend_by_ktid(evt, ktid, add)
|
|
else:
|
|
await _edit_friend_by_uuid(evt, arg, add)
|
|
|
|
async def _edit_friend_by_ktid(evt: CommandEvent, ktid: Long, add: bool) -> None:
|
|
await evt.mark_read()
|
|
try:
|
|
friend_struct = await evt.sender.client.edit_friend(ktid, add)
|
|
except RPCError as e:
|
|
await evt.reply(str(e))
|
|
else:
|
|
await _on_friend_edited(evt, friend_struct, add)
|
|
|
|
async def _edit_friend_by_uuid(evt: CommandEvent, uuid: str, add: bool) -> None:
|
|
await evt.mark_read()
|
|
try:
|
|
friend_struct = await evt.sender.client.edit_friend_by_uuid(uuid, add)
|
|
except RPCError as e:
|
|
await evt.reply(str(e))
|
|
except CommandException as e:
|
|
if e.status == -1002:
|
|
await evt.reply(
|
|
f"Failed to {'add' if add else 'remove'} friend. Ensure their ID is spelled correctly."
|
|
)
|
|
else:
|
|
raise
|
|
else:
|
|
await _on_friend_edited(evt, friend_struct, add)
|
|
|
|
async def _on_friend_edited(evt: CommandEvent, friend_struct: FriendStruct | None, add: bool):
|
|
await evt.reply(f"Friend {'added' if add else 'removed'}")
|
|
if friend_struct:
|
|
puppet = await pu.Puppet.get_by_ktid(friend_struct.userId)
|
|
await puppet.update_info_from_friend(evt.sender, friend_struct)
|
|
|
|
|
|
@command_handler(
|
|
needs_auth=True,
|
|
management_only=False,
|
|
help_section=SECTION_CHANNELS,
|
|
help_text="Leave this KakaoTalk channel",
|
|
)
|
|
async def leave(evt: CommandEvent) -> None:
|
|
if not evt.sender.is_connected:
|
|
await evt.reply("You are not connected to KakaoTalk chats")
|
|
return
|
|
if not evt.is_portal:
|
|
await evt.reply("This command may only be used in a KakaoTalk channel portal room")
|
|
return
|
|
await evt.mark_read()
|
|
try:
|
|
await evt.sender.leave_channel(evt.portal)
|
|
except CommandException as e:
|
|
await evt.reply(f"Error from KakaoTalk: {e}")
|