# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import Dict, List, NamedTuple, Optional from mautrix.types import Format, MessageEventContent, RelationType, RoomID, UserID from mautrix.util import utf16_surrogate from mautrix.util.formatter import ( EntityString, EntityType, MarkdownString, MatrixParser, SimpleEntity, ) from mautrix.util.logging import TraceLogger from ..kt.types.bson import Long from ..kt.types.chat import KnownChatType from ..kt.types.chat.attachment import ReplyAttachment, MentionStruct from ..kt.client.types import TO_MSGTYPE_MAP from .. import portal as po, puppet as pu, user as u from ..db import Message as DBMessage class SendParams(NamedTuple): text: str mentions: Optional[List[MentionStruct]] reply_to: ReplyAttachment class KakaoTalkFormatString(EntityString[SimpleEntity, EntityType], MarkdownString): def format(self, entity_type: EntityType, **kwargs) -> KakaoTalkFormatString: prefix = suffix = "" if entity_type == EntityType.USER_MENTION: self.entities.append( SimpleEntity( type=entity_type, offset=0, length=len(self.text), extra_info={"user_id": kwargs["user_id"]}, ) ) return self elif entity_type == EntityType.BOLD: prefix = suffix = "*" elif entity_type == EntityType.ITALIC: prefix = suffix = "_" elif entity_type == EntityType.STRIKETHROUGH: prefix = suffix = "~" elif entity_type == EntityType.URL: if kwargs["url"] != self.text: suffix = f" ({kwargs['url']})" elif entity_type == EntityType.PREFORMATTED: prefix = f"```{kwargs['language']}\n" suffix = "\n```" elif entity_type == EntityType.INLINE_CODE: prefix = suffix = "`" elif entity_type == EntityType.BLOCKQUOTE: children = self.trim().split("\n") children = [child.prepend("> ") for child in children] return self.join(children, "\n") elif entity_type == EntityType.HEADER: prefix = "#" * kwargs["size"] + " " else: return self self._offset_entities(len(prefix)) self.text = f"{prefix}{self.text}{suffix}" return self class ToKakaoTalkParser(MatrixParser[KakaoTalkFormatString]): fs = KakaoTalkFormatString async def _get_id_from_mxid(mxid: UserID, portal: po.Portal) -> Optional[Long]: orig_sender = await u.User.get_by_mxid(mxid, create=False) if orig_sender and orig_sender.ktid: return orig_sender.ktid elif orig_sender: sender, _ = await portal.get_relay_sender(orig_sender, "relation") if sender and sender.ktid: return sender.ktid puppet = await pu.Puppet.get_by_mxid(mxid, create=False) return puppet.ktid if puppet else None async def matrix_to_kakaotalk( content: MessageEventContent, room_id: RoomID, log: TraceLogger, portal: po.Portal, skip_reply: bool = False ) -> SendParams: # NOTE By design, this *throws* if user intent can't be matched (i.e. if a reply can't be created) if content.relates_to.rel_type == RelationType.REPLY and not skip_reply: message = await DBMessage.get_by_mxid(content.relates_to.event_id, room_id) if not message or not message.ktid: raise ValueError( f"Couldn't find reply target {content.relates_to.event_id}" " to bridge text message reply metadata to KakaoTalk" ) try: src_event = await portal.main_intent.get_event(room_id, message.mxid) except: log.exception(f"Failed to find Matrix event for reply target {message.mxid}") raise src_kt_sender = await _get_id_from_mxid(src_event.sender, portal) if src_kt_sender is None: raise ValueError( f"Found no KakaoTalk user ID for reply target sender {src_event.sender}" ) content.trim_reply_fallback() src_converted = await matrix_to_kakaotalk(src_event.content, room_id, log, portal, skip_reply=True) if src_event.content.relates_to.rel_type == RelationType.REPLY: src_type = KnownChatType.REPLY src_message = src_converted.text else: src_type = TO_MSGTYPE_MAP[src_event.content.msgtype] if src_type == KnownChatType.FILE: src_message = _media_type_reply_body_map[KnownChatType.FILE] + src_converted.text else: src_message = _media_type_reply_body_map.get(src_type, src_converted.text) reply_to = ReplyAttachment( # NOTE mentions will be merged into this later # TODO Set this for emoticon reply, but must first support them attach_only=False, # TODO If replying with media works, must set type AND all attachment properties # But then, the reply object must be an intersection of a ReplyAttachment and something else #attach_type=TO_MSGTYPE_MAP.get(content.msgtype), # TODO Confirm why official client sets this to 0, and whether this should be left as None instead attach_type=0, src_logId=message.ktid, src_mentions=src_converted.mentions or [], src_message=src_message, src_type=src_type, src_userId=src_kt_sender, ) else: reply_to = None if ( content.get("format", None) == Format.HTML and content["formatted_body"] and content.msgtype.is_text and not portal.is_direct ): parsed = await ToKakaoTalkParser().parse(utf16_surrogate.add(content["formatted_body"])) text = utf16_surrogate.remove(parsed.text) mentions_by_user: Dict[Long, MentionStruct] = {} # Make sure to not create remote mentions for any remote user not in the room if parsed.entities: joined_members = set(await portal.main_intent.get_room_members(room_id)) last_offset = 0 at = 0 for mention in sorted(parsed.entities, key=lambda entity: entity.offset): mxid = mention.extra_info["user_id"] if mxid not in joined_members: continue ktid = await _get_id_from_mxid(mxid, portal) if ktid is None: continue at += text[last_offset:mention.offset+1].count("@") last_offset = mention.offset+1 mention_by_user = mentions_by_user.setdefault(ktid, MentionStruct( at=[], len=mention.length, user_id=ktid, )) mention_by_user.at.append(at) mentions = list(mentions_by_user.values()) if mentions_by_user else None else: text = content.body mentions = None return SendParams(text=text, mentions=mentions, reply_to=reply_to) _media_type_reply_body_map: Dict[KnownChatType, str] = { KnownChatType.PHOTO: "Photo", KnownChatType.VIDEO: "Video", KnownChatType.AUDIO: "Voice Note", KnownChatType.FILE: "File: ", }