# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge. # Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations from typing import ( TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Pattern, cast, ) from io import BytesIO from mimetypes import guess_extension import asyncio import re import time from mautrix.appservice import IntentAPI from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock from mautrix.errors import MatrixError from mautrix.types import ( AudioInfo, ContentURI, EncryptedFile, EventID, EventType, FileInfo, ImageInfo, LocationMessageEventContent, MediaInfo, MediaMessageEventContent, Membership, MessageEventContent, MessageType, RoomID, TextMessageEventContent, UserID, VideoInfo, ) from mautrix.util import ffmpeg, magic from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.simple_lock import SimpleLock from . import matrix as m, puppet as p, user as u from .config import Config from .db import ( Message as DBMessage, Portal as DBPortal, ) from .formatter import kakaotalk_to_matrix, matrix_to_kakaotalk from .kt.types.bson import Long from .kt.types.channel.channel_info import ChannelInfo from .kt.types.channel.channel_type import KnownChannelType, ChannelType from .kt.types.chat import Chatlog, KnownChatType from .kt.types.chat.attachment import ( Attachment, AudioAttachment, #FileAttachment, MediaAttachment, MultiPhotoAttachment, PhotoAttachment, VideoAttachment, ) from .kt.client.types import ( UserInfoUnion, PortalChannelInfo, ChannelProps, TO_MSGTYPE_MAP, ) from .kt.client.errors import CommandException if TYPE_CHECKING: from .__main__ import KakaoTalkBridge try: from PIL import Image except ImportError: Image = None try: from mautrix.crypto.attachments import decrypt_attachment, encrypt_attachment except ImportError: decrypt_attachment = encrypt_attachment = None geo_uri_regex: Pattern = re.compile(r"^geo:(-?\d+.\d+),(-?\d+.\d+)$") class FakeLock: async def __aenter__(self) -> None: pass async def __aexit__(self, exc_type, exc, tb) -> None: pass StateBridge = EventType.find("m.bridge", EventType.Class.STATE) StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE) class Portal(DBPortal, BasePortal): invite_own_puppet_to_pm: bool = False by_mxid: dict[RoomID, Portal] = {} by_ktid: dict[tuple[int, int], Portal] = {} matrix: m.MatrixHandler config: Config _main_intent: IntentAPI | None _kt_sender: int | None _create_room_lock: asyncio.Lock _send_locks: dict[int, asyncio.Lock] _noop_lock: FakeLock = FakeLock() _typing: set[UserID] backfill_lock: SimpleLock _backfill_leave: set[IntentAPI] | None _sleeping_to_resync: bool _scheduled_resync: asyncio.Task | None _resync_targets: dict[int, p.Puppet] def __init__( self, ktid: Long, kt_receiver: Long, kt_type: ChannelType, mxid: RoomID | None = None, name: str | None = None, photo_id: str | None = None, avatar_url: ContentURI | None = None, encrypted: bool = False, name_set: bool = False, avatar_set: bool = False, relay_user_id: UserID | None = None, ) -> None: super().__init__( ktid, kt_receiver, kt_type, mxid, name, photo_id, avatar_url, encrypted, name_set, avatar_set, relay_user_id, ) self.log = self.log.getChild(self.ktid_log) self._main_intent = None self._kt_sender = None self._create_room_lock = asyncio.Lock() self._send_locks = {} self._typing = set() self._sleeping_to_resync = False self._scheduled_resync = None self._resync_targets = {} self.backfill_lock = SimpleLock( "Waiting for backfilling to finish before handling %s", log=self.log ) self._backfill_leave = None self._relay_user = None @classmethod def init_cls(cls, bridge: KakaoTalkBridge) -> None: BasePortal.bridge = bridge cls.az = bridge.az cls.config = bridge.config cls.loop = bridge.loop cls.matrix = bridge.matrix cls.invite_own_puppet_to_pm = cls.config["bridge.invite_own_puppet_to_pm"] NotificationDisabler.puppet_cls = p.Puppet NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"] # TODO More cls._message_type_handler_map: dict[ KnownChatType, Callable[ [ Portal, u.User, IntentAPI, Attachment | None, int, str | None ], Awaitable[list[EventID]] ] ] = { KnownChatType.TEXT: cls._handle_remote_text, KnownChatType.PHOTO: cls._handle_remote_photo, KnownChatType.MULTIPHOTO: cls._handle_remote_multiphoto, KnownChatType.VIDEO: cls._handle_remote_video, KnownChatType.AUDIO: cls._handle_remote_audio, #KnownChatType.FILE: cls._handle_remote_file, } # region DB conversion async def delete(self) -> None: if self.mxid: await DBMessage.delete_all_by_room(self.mxid) self.by_mxid.pop(self.mxid, None) self.by_ktid.pop(self._ktid_full, None) self.mxid = None self.name_set = False self.avatar_set = False self.relay_user_id = None self.encrypted = False await super().save() # endregion # region Properties @property def _ktid_full(self) -> tuple[int, int]: return self.ktid, self.kt_receiver @property def ktid_log(self) -> str: if self.is_direct: return f"{self.ktid}->{self.kt_receiver}" return str(self.ktid) @property def is_direct(self) -> bool: return KnownChannelType.is_direct(self.kt_type) @property def kt_sender(self) -> int | None: if self.is_direct: if not self._kt_sender: raise ValueError("Direct chat portal must set sender") else: if self._kt_sender: raise ValueError(f"Non-direct chat portal should have no sender, but has sender {self._kt_sender}") return self._kt_sender @property def channel_props(self) -> ChannelProps: return ChannelProps( id=self.ktid, type=self.kt_type ) @property def main_intent(self) -> IntentAPI: if not self._main_intent: raise ValueError( "Portal must be postinit()ed before main_intent can be used" if not self.is_direct else "Direct chat portal must call postinit and _update_participants before main_intent can be used" ) return self._main_intent async def get_dm_puppet(self) -> p.Puppet | None: if not self.is_direct: return None return await p.Puppet.get_by_ktid(self.kt_sender) # endregion # region Chat info updating def schedule_resync(self, source: u.User, target: p.Puppet) -> None: self._resync_targets[target.ktid] = target if ( self._sleeping_to_resync and self._scheduled_resync and not self._scheduled_resync.done() ): return self._sleeping_to_resync = True self.log.debug(f"Scheduling resync through {source.mxid}/{source.ktid}") self._scheduled_resync = asyncio.create_task(self._sleep_and_resync(source, 10)) async def _sleep_and_resync(self, source: u.User, sleep: int) -> None: await asyncio.sleep(sleep) targets = self._resync_targets self._sleeping_to_resync = False self._resync_targets = {} for puppet in targets.values(): if not puppet.name or not puppet.name_set: break else: self.log.debug( f"Cancelled resync through {source.mxid}/{source.ktid}, all puppets have names" ) return self.log.debug(f"Resyncing chat through {source.mxid}/{source.ktid} after sleeping") await self.update_info(source) self._scheduled_resync = None self.log.debug(f"Completed scheduled resync through {source.mxid}/{source.ktid}") async def update_info( self, source: u.User, info: PortalChannelInfo | None = None, force_save: bool = False, ) -> PortalChannelInfo: if not info: self.log.debug("Called update_info with no info, fetching channel info...") info = await source.client.get_portal_channel_info(self.channel_props) changed = False if not self.is_direct: changed = any( await asyncio.gather( self._update_name(info.name), # TODO #self._update_photo(source, info.image), ) ) changed = await self._update_participants(source, info.participants) or changed if changed or force_save: await self.update_bridge_info() await self.save() return info @classmethod async def _reupload_remote_file( cls, url: str, source: u.User, intent: IntentAPI, *, filename: str | None = None, mimetype: str | None, encrypt: bool = False, find_size: bool = False, convert_audio: bool = False, ) -> tuple[ContentURI, FileInfo | VideoInfo | AudioInfo | ImageInfo, EncryptedFile | None]: if not url: raise ValueError("URL not provided") sandbox = cls.config["bridge.sandbox_media_download"] # TODO Referer header? async with source.client.get(url, sandbox=sandbox) as resp: length = int(resp.headers["Content-Length"]) if length > cls.matrix.media_config.upload_size: raise ValueError("File not available: too large") data = await resp.read() if not mimetype: mimetype = magic.mimetype(data) if convert_audio and mimetype != "audio/ogg": data = await ffmpeg.convert_bytes( data, ".ogg", output_args=("-c:a", "libopus"), input_mime=mimetype ) mimetype = "audio/ogg" info = FileInfo(mimetype=mimetype, size=len(data)) if Image and mimetype.startswith("image/") and find_size: with Image.open(BytesIO(data)) as img: width, height = img.size info = ImageInfo(mimetype=mimetype, size=len(data), width=width, height=height) upload_mime_type = mimetype decryption_info = None if encrypt and encrypt_attachment: data, decryption_info = encrypt_attachment(data) upload_mime_type = "application/octet-stream" filename = None url = await intent.upload_media( data, mime_type=upload_mime_type, filename=filename, async_upload=cls.config["homeserver.async_media"], ) if decryption_info: decryption_info.url = url return url, info, decryption_info async def _update_name(self, name: str) -> bool: if not name: self.log.warning("Got empty name in _update_name call") return False if self.name != name or not self.name_set: self.log.trace("Updating name %s -> %s", self.name, name) self.name = name if self.mxid and (self.encrypted or not self.is_direct): try: await self.main_intent.set_room_name(self.mxid, self.name) self.name_set = True except Exception: self.log.exception("Failed to set room name") self.name_set = False return True return False """ async def _update_photo(self, source: u.User, photo: graphql.Picture) -> bool: if self.is_direct and not self.encrypted: return False photo_id = self.get_photo_id(photo) if self.photo_id != photo_id or not self.avatar_set: self.photo_id = photo_id if photo: if self.photo_id != photo_id or not self.avatar_url: # Reset avatar_url first in case the upload fails self.avatar_url = None self.avatar_url = await p.Puppet.reupload_avatar( source, self.main_intent, photo.uri, self.ktid, use_graph=self.is_direct and (photo.height or 0) < 500, ) else: self.avatar_url = ContentURI("") if self.mxid: try: await self.main_intent.set_room_avatar(self.mxid, self.avatar_url) self.avatar_set = True except Exception: self.log.exception("Failed to set room avatar") self.avatar_set = False return True return False """ async def _update_photo_from_puppet(self, puppet: p.Puppet) -> bool: if self.photo_id == puppet.photo_id and self.avatar_set: return False self.photo_id = puppet.photo_id if puppet.photo_mxc: self.avatar_url = puppet.photo_mxc elif self.photo_id: profile = await self.main_intent.get_profile(puppet.default_mxid) self.avatar_url = profile.avatar_url puppet.photo_mxc = profile.avatar_url else: self.avatar_url = ContentURI("") if self.mxid: try: await self.main_intent.set_room_avatar(self.mxid, self.avatar_url) self.avatar_set = True except Exception: self.log.exception("Failed to set room avatar") self.avatar_set = False return True async def update_info_from_puppet(self, puppet: p.Puppet | None = None) -> bool: if not self.is_direct: return False if not puppet: puppet = await self.get_dm_puppet() changed = await self._update_name(puppet.name) changed = await self._update_photo_from_puppet(puppet) or changed return changed """ async def sync_per_room_nick(self, puppet: p.Puppet, name: str) -> None: intent = puppet.intent_for(self) content = MemberStateEventContent( membership=Membership.JOIN, avatar_url=puppet.photo_mxc, displayname=name or puppet.name, ) content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name current_state = await intent.state_store.get_member(self.mxid, intent.mxid) if not current_state or current_state.displayname != content.displayname: self.log.debug( "Syncing %s's per-room nick %s to the room", puppet.ktid, content.displayname, ) await intent.send_state_event( self.mxid, EventType.ROOM_MEMBER, content, state_key=intent.mxid ) """ async def _update_participant( self, source: u.User, participant: UserInfoUnion ) -> bool: # TODO nick map? self.log.trace("Syncing participant %s", participant.id) puppet = await p.Puppet.get_by_ktid(participant.userId) await puppet.update_info_from_participant(source, participant) changed = False if self.is_direct and self._kt_sender == puppet.ktid and self.encrypted: changed = await self._update_info_from_puppet(puppet.name) or changed if self.mxid: if puppet.ktid != self.kt_receiver or puppet.is_real_user: await puppet.intent_for(self).ensure_joined(self.mxid, bot=self.main_intent) #if puppet.ktid in nick_map: # await self.sync_per_room_nick(puppet, nick_map[puppet.ktid]) return changed async def _update_participants(self, source: u.User, participants: list[UserInfoUnion] | None = None) -> bool: # TODO nick map? if participants is None: self.log.debug("Called _update_participants with no participants, fetching them now...") participants = await source.client.get_participants(self.channel_props) if not self._main_intent: assert self.is_direct, "_main_intent for non-direct chat portal should have been set already" self._kt_sender = participants[ 0 if self.kt_type == KnownChannelType.MemoChat or participants[0].userId != source.ktid else 1 ].userId self._main_intent = (await self.get_dm_puppet()).default_mxid_intent else: self._kt_sender = (await p.Puppet.get_by_mxid(self._main_intent.mxid)).ktid if self.is_direct else None sync_tasks = [ self._update_participant(source, pcp) for pcp in participants ] changed = any(await asyncio.gather(*sync_tasks)) return changed # endregion # region Matrix room creation async def update_matrix_room(self, source: u.User, info: PortalChannelInfo | None = None) -> None: try: await self._update_matrix_room(source, info) except Exception: self.log.exception("Failed to update portal") def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, Any]: invite_content = {} if double_puppet: invite_content["fi.mau.will_auto_accept"] = True if self.is_direct: invite_content["is_direct"] = True return invite_content async def _update_matrix_room( self, source: u.User, info: PortalChannelInfo | None = None ) -> None: info = await self.update_info(source, info) puppet = await p.Puppet.get_by_custom_mxid(source.mxid) await self.main_intent.invite_user( self.mxid, source.mxid, check_cache=True, extra_content=self._get_invite_content(puppet), ) if puppet: did_join = await puppet.intent.ensure_joined(self.mxid) if did_join and self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) # TODO #await self._sync_read_receipts(info.read_receipts.nodes) """ async def _sync_read_receipts(self, receipts: list[None]) -> None: for receipt in receipts: message = await DBMessage.get_closest_before( self.ktid, self.kt_receiver, receipt.timestamp ) if not message: continue puppet = await p.Puppet.get_by_ktid(receipt.actor.id, create=False) if not puppet: continue try: await puppet.intent_for(self).mark_read(message.mx_room, message.mxid) except Exception: self.log.warning( f"Failed to mark {message.mxid} in {message.mx_room} " f"as read by {puppet.intent.mxid}", exc_info=True, ) """ async def create_matrix_room( self, source: u.User, info: PortalChannelInfo | None = None ) -> RoomID | None: if self.mxid: try: await self._update_matrix_room(source, info) except Exception: self.log.exception("Failed to update portal") return self.mxid async with self._create_room_lock: try: return await self._create_matrix_room(source, info) except Exception: self.log.exception("Failed to create portal") return None @property def bridge_info_state_key(self) -> str: return f"net.miscworks.kakaotalk://kakaotalk/{self.ktid}" @property def bridge_info(self) -> dict[str, Any]: return { "bridgebot": self.az.bot_mxid, "creator": self.main_intent.mxid, "protocol": { "id": "kakaotalk", "displayname": "KakaoTalk", "avatar_url": self.config["appservice.bot_avatar"], }, "channel": { "id": str(self.ktid), "displayname": self.name, "avatar_url": self.avatar_url, }, } async def update_bridge_info(self) -> None: if not self.mxid: self.log.debug("Not updating bridge info: no Matrix room created") return try: self.log.debug("Updating bridge info...") await self.main_intent.send_state_event( self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key ) # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec await self.main_intent.send_state_event( self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key ) except Exception: self.log.warning("Failed to update bridge info", exc_info=True) async def _create_matrix_room( self, source: u.User, info: PortalChannelInfo | None = None ) -> RoomID | None: if self.mxid: await self._update_matrix_room(source, info) return self.mxid self.log.debug(f"Creating Matrix room") if self.is_direct: # NOTE Must do this to find the other member of the DM, since the channel ID != the member's ID! await self._update_participants(source, info.participants) name: str | None = None initial_state = [ { "type": str(StateBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec { "type": str(StateHalfShotBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, ] invites = [] if self.config["bridge.encryption.default"] and self.matrix.e2ee: self.encrypted = True initial_state.append( { "type": "m.room.encryption", "content": {"algorithm": "m.megolm.v1.aes-sha2"}, } ) if self.is_direct: invites.append(self.az.bot_mxid) info = await self.update_info(source=source, info=info) if self.encrypted or not self.is_direct: name = self.name initial_state.append( { "type": str(EventType.ROOM_AVATAR), "content": {"url": self.avatar_url}, } ) # We lock backfill lock here so any messages that come between the room being created # and the initial backfill finishing wouldn't be bridged before the backfill messages. with self.backfill_lock: creation_content = {} if not self.config["bridge.federate_rooms"]: creation_content["m.federate"] = False self.mxid = await self.main_intent.create_room( name=name, is_direct=self.is_direct, initial_state=initial_state, invitees=invites, creation_content=creation_content, ) if not self.mxid: raise Exception("Failed to create room: no mxid returned") if self.encrypted and self.matrix.e2ee and self.is_direct: try: await self.az.intent.ensure_joined(self.mxid) except Exception: self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}") await self.save() self.log.debug(f"Matrix room created: {self.mxid}") self.by_mxid[self.mxid] = self puppet = await p.Puppet.get_by_custom_mxid(source.mxid) await self.main_intent.invite_user( self.mxid, source.mxid, extra_content=self._get_invite_content(puppet) ) if puppet: try: if self.is_direct: await source.update_direct_chats({self.main_intent.mxid: [self.mxid]}) await puppet.intent.join_room_by_id(self.mxid) except MatrixError: self.log.debug( "Failed to join custom puppet into newly created portal", exc_info=True, ) if not self.is_direct: # NOTE Calling this after room creation to invite participants await self._update_participants(source, info.participants) try: await self.backfill(source, is_initial=True, channel_info=info.channel_info) except Exception: self.log.exception("Failed to backfill new portal") # TODO #await self._sync_read_receipts(info.read_receipts.nodes) return self.mxid # endregion # region Matrix event handling def require_send_lock(self, user_id: int) -> asyncio.Lock: try: lock = self._send_locks[user_id] except KeyError: lock = asyncio.Lock() self._send_locks[user_id] = lock return lock def optional_send_lock(self, user_id: int) -> asyncio.Lock | FakeLock: try: return self._send_locks[user_id] except KeyError: pass return self._noop_lock async def _send_delivery_receipt(self, event_id: EventID) -> None: if event_id and self.config["bridge.delivery_receipts"]: try: await self.az.intent.mark_read(self.mxid, event_id) except Exception: self.log.exception(f"Failed to send delivery receipt for {event_id}") async def _send_bridge_error(self, msg: str, thing: str = "message") -> None: await self._send_message( self.main_intent, TextMessageEventContent( msgtype=MessageType.NOTICE, body=f"\u26a0 Your {thing} may not have been bridged: {msg}", ), ) def _status_from_exception(self, e: Exception) -> MessageSendCheckpointStatus: if isinstance(e, NotImplementedError): return MessageSendCheckpointStatus.UNSUPPORTED return MessageSendCheckpointStatus.PERM_FAILURE async def handle_matrix_message( self, sender: u.User, message: MessageEventContent, event_id: EventID ) -> None: try: await self._handle_matrix_message(sender, message, event_id) except Exception as e: self.log.exception(f"Failed to handle Matrix event {event_id}: {e}") sender.send_remote_checkpoint( self._status_from_exception(e), event_id, self.mxid, EventType.ROOM_MESSAGE, message.msgtype, error=e, ) await self._send_bridge_error(str(e)) else: await self._send_delivery_receipt(event_id) async def _handle_matrix_message( self, orig_sender: u.User, message: MessageEventContent, event_id: EventID ) -> None: if message.get_edit(): raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.") sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}") if not sender: raise Exception("not logged in") elif not sender.has_state: raise Exception("not connected to KakaoTalk") elif is_relay: await self.apply_relay_message_format(orig_sender, message) if message.msgtype == MessageType.TEXT or message.msgtype == MessageType.NOTICE: await self._handle_matrix_text(event_id, sender, message) elif message.msgtype.is_media: await self._handle_matrix_media(event_id, sender, message) # elif message.msgtype == MessageType.LOCATION: # await self._handle_matrix_location(sender, message) else: raise NotImplementedError(f"Unsupported message type {message.msgtype}") async def _make_dbm(self, event_id: EventID, ktid: Long | None = None) -> DBMessage: dbm = DBMessage( mxid=event_id, mx_room=self.mxid, ktid=ktid, index=0, kt_chat=self.ktid, kt_receiver=self.kt_receiver, timestamp=int(time.time() * 1000), ) await dbm.insert() return dbm async def _handle_matrix_text( self, event_id: EventID, sender: u.User, message: TextMessageEventContent ) -> None: converted = await matrix_to_kakaotalk(message, self.mxid, self.log) try: chatlog = await sender.client.send_message( self.channel_props, text=converted.text, # TODO #mentions=converted.mentions, #reply_to=converted.reply_to, ) except CommandException as e: self.log.debug(f"Error handling Matrix message {event_id}: {e!s}") raise await self._make_dbm(event_id, chatlog.logId) self.log.debug(f"Handled Matrix message {event_id} -> {chatlog.logId}") sender.send_remote_checkpoint( MessageSendCheckpointStatus.SUCCESS, event_id, self.mxid, EventType.ROOM_MESSAGE, message.msgtype, ) async def _handle_matrix_media( self, event_id: EventID, sender: u.User, message: MediaMessageEventContent ) -> None: if message.file and decrypt_attachment: data = await self.main_intent.download_media(message.file.url) data = decrypt_attachment( data, message.file.key.key, message.file.hashes.get("sha256"), message.file.iv ) elif message.url: data = await self.main_intent.download_media(message.url) else: raise NotImplementedError("No file or URL specified") mimetype = message.info.mimetype or magic.mimetype(data) """ TODO Replies reply_to = None if message.relates_to.rel_type == RelationType.REPLY: reply_to_msg = await DBMessage.get_by_mxid(message.relates_to.event_id, self.mxid) if reply_to_msg: reply_to = reply_to_msg.ktid else: self.log.warning( f"Couldn't find reply target {message.relates_to.event_id}" " to bridge media message reply metadata to KakaoTalk" ) """ filename = message.body width, height = None, None if message.info in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO): width = message.info.width height = message.info.height try: chatlog = await sender.client.send_media( self.channel_props, TO_MSGTYPE_MAP[message.msgtype], data, filename, width=width, height=height, ext=guess_extension(mimetype)[1:], # TODO #reply_to=reply_to, ) except CommandException as e: self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}") raise await self._make_dbm(event_id, chatlog.logId) self.log.debug(f"Handled Matrix message {event_id} -> {chatlog.logId}") sender.send_remote_checkpoint( MessageSendCheckpointStatus.SUCCESS, event_id, self.mxid, EventType.ROOM_MESSAGE, message.msgtype, ) async def _handle_matrix_location( self, sender: u.User, message: LocationMessageEventContent ) -> str: pass # TODO # match = geo_uri_regex.fullmatch(message.geo_uri) # return await self.thread_for(sender).send_pinned_location(float(match.group(1)), # float(match.group(2))) async def handle_matrix_redaction( self, sender: u.User, event_id: EventID, redaction_event_id: EventID ) -> None: try: await self._handle_matrix_redaction(sender, event_id, redaction_event_id) except Exception as e: self.log.exception(f"Failed to handle Matrix event {event_id}: {e}") sender.send_remote_checkpoint( self._status_from_exception(e), event_id, self.mxid, EventType.ROOM_REDACTION, error=e, ) await self._send_bridge_error(str(e)) else: await self._send_delivery_receipt(event_id) async def _handle_matrix_redaction( self, sender: u.User, event_id: EventID, redaction_event_id: EventID ) -> None: self.log.info("TODO: _handle_matrix_redaction") async def handle_matrix_reaction( self, sender: u.User, event_id: EventID, reacting_to: EventID, reaction: str ) -> None: self.log.info("TODO: handle_matrix_reaction") async def handle_matrix_leave(self, user: u.User) -> None: if self.is_direct: self.log.info(f"{user.mxid} left private chat portal with {self.ktid}") if user.ktid == self.kt_receiver: self.log.info( f"{user.mxid} was the recipient of this portal. Cleaning up and deleting..." ) await self.cleanup_and_delete() else: self.log.debug(f"{user.mxid} left portal to {self.ktid}") async def _set_typing(self, users: set[UserID], typing: bool) -> None: self.log.info("TODO: _set_typing") async def handle_matrix_typing(self, users: set[UserID]) -> None: await asyncio.gather( self._set_typing(users - self._typing, typing=True), self._set_typing(self._typing - users, typing=False), ) self._typing = users # endregion # region KakaoTalk event handling async def _bridge_own_message_pm( self, source: u.User, sender: p.Puppet, mid: str, invite: bool = True ) -> bool: if self.is_direct and sender.ktid == source.ktid and not sender.is_real_user: if self.invite_own_puppet_to_pm and invite: await self.main_intent.invite_user(self.mxid, sender.mxid) elif ( await self.az.state_store.get_membership(self.mxid, sender.mxid) != Membership.JOIN ): self.log.warning( f"Ignoring own {mid} in private chat because own puppet is not in room." ) return False return True async def handle_remote_message( self, source: u.User, sender: p.Puppet, message: Chatlog, ) -> None: try: await self._handle_remote_message(source, sender, message) except Exception: self.log.exception( "Error handling KakaoTalk message %s", message.logId, ) async def _handle_remote_message( self, source: u.User, sender: p.Puppet, message: Chatlog, ) -> None: # TODO Backfill!! This avoids timing conflicts on startup sync self.log.debug(f"Handling KakaoTalk event {message.logId}") if not self.mxid: mxid = await self.create_matrix_room(source) if not mxid: # Failed to create return if not await self._bridge_own_message_pm(source, sender, f"message {message.logId}"): return intent = sender.intent_for(self) if ( self._backfill_leave is not None and self.ktid != sender.ktid and intent != sender.intent and intent not in self._backfill_leave ): self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid) await self.main_intent.invite_user(self.mxid, intent.mxid) await intent.ensure_joined(self.mxid) self._backfill_leave.add(intent) handler = self._message_type_handler_map.get(message.type) if not handler: self.log.warning(f"No handler for message type {message.type}, falling back to text") handler = Portal._handle_remote_text event_ids = [ event_id for event_id in await handler( self, source, intent, message.attachment, message.sendAt, message.text) if event_id ] if not event_ids: self.log.warning(f"Unhandled KakaoTalk message {message.logId}") return self.log.debug(f"Handled KakaoTalk message {message.logId} -> {event_ids}") # TODO Might have to handle remote reactions on messages created by bulk_create await DBMessage.bulk_create( ktid=message.logId, kt_chat=self.ktid, kt_receiver=self.kt_receiver, mx_room=self.mxid, timestamp=message.sendAt, event_ids=event_ids, ) await self._send_delivery_receipt(event_ids[-1]) async def _handle_remote_text( self, source: u.User, intent: IntentAPI, attachment: None, timestamp: int, message_text: str | None, ) -> list[EventID]: # TODO Handle mentions properly content = await kakaotalk_to_matrix(message_text) # TODO Replies return [await self._send_message(intent, content, timestamp=timestamp)] def _handle_remote_photo( self, source: u.User, intent: IntentAPI, attachment: PhotoAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[list[EventID]]: return asyncio.gather(self._handle_remote_uniphoto( source, intent, attachment, timestamp, message_text )) async def _handle_remote_multiphoto( self, source: u.User, intent: IntentAPI, attachment: MultiPhotoAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[list[EventID]]: # TODO Upload media concurrently, but post messages sequentially return [ await self._handle_remote_uniphoto( source, intent, PhotoAttachment( shout=attachment.shout, mentions=attachment.mentions, urls=attachment.urls, url=attachment.imageUrls[i], s=attachment.sl[i], k=attachment.kl[i], w=attachment.wl[i], h=attachment.hl[i], thumbnailUrl=attachment.thumbnailUrls[i], thumbnailWidth=attachment.thumbnailWidths[i], thumbnailHeight=attachment.thumbnailHeights[i], cs=attachment.csl[i], mt=attachment.mtl[i], ), timestamp, message_text, ) for i in range(len(attachment.imageUrls)) ] def _handle_remote_uniphoto( self, source: u.User, intent: IntentAPI, attachment: PhotoAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[EventID]: return self._handle_remote_media( source, intent, attachment, timestamp, message_text, ImageInfo( mimetype=attachment.mt, size=attachment.s, width=attachment.w, height=attachment.h, ), MessageType.IMAGE, ) def _handle_remote_video( self, source: u.User, intent: IntentAPI, attachment: VideoAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[list[EventID]]: return asyncio.gather(self._handle_remote_media( source, intent, attachment, timestamp, message_text, VideoInfo( duration=attachment.d, width=attachment.w, height=attachment.h, ), MessageType.VIDEO, )) def _handle_remote_audio( self, source: u.User, intent: IntentAPI, attachment: AudioAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[list[EventID]]: return asyncio.gather(self._handle_remote_media( source, intent, attachment, timestamp, message_text, AudioInfo( size=attachment.s, duration=attachment.d, ), MessageType.AUDIO, )) """ TODO Find what auth is required for reading file contents def _handle_remote_file( self, source: u.User, intent: IntentAPI, attachment: FileAttachment, timestamp: int, message_text: str | None, ) -> Awaitable[list[EventID]]: return asyncio.gather(self._handle_remote_media( source, intent, attachment, timestamp, message_text, FileInfo( size=attachment.size, ), MessageType.FILE, )) """ async def _handle_remote_media( self, source: u.User, intent: IntentAPI, attachment: MediaAttachment, timestamp: int, message_text: str | None, info: MediaInfo, msgtype: MessageType, ) -> EventID: mxc, additional_info, decryption_info = await self._reupload_remote_file( attachment.url, source, intent, mimetype=info.mimetype, encrypt=self.encrypted, find_size=False, ) info.size = additional_info.size info.mimetype = additional_info.mimetype content = MediaMessageEventContent( url=mxc, file=decryption_info, msgtype=msgtype, body=message_text, info=info ) # TODO Replies return await self._send_message(intent, content, timestamp=timestamp) # TODO Many more remote handlers # endregion async def backfill(self, source: u.User, is_initial: bool, channel_info: ChannelInfo) -> None: limit = ( self.config["bridge.backfill.initial_limit"] if is_initial else self.config["bridge.backfill.missed_limit"] ) if limit == 0: return elif limit < 0: limit = None last_log_id = None if not is_initial and channel_info.lastChatLog: last_log_id = channel_info.lastChatLog.logId most_recent = await DBMessage.get_most_recent(self.ktid, self.kt_receiver) if most_recent and is_initial: self.log.debug("Not backfilling %s: already bridged messages found", self.ktid_log) # TODO Should this be removed? With it, can't sync an empty portal! #elif (not most_recent or not most_recent.timestamp) and not is_initial: # self.log.debug("Not backfilling %s: no most recent message found", self.ktid_log) elif last_log_id and most_recent and int(most_recent.ktid) >= int(last_log_id): self.log.debug( "Not backfilling %s: last activity is equal to most recent bridged " "message (%s >= %s)", self.ktid_log, most_recent.ktid, last_log_id, ) else: with self.backfill_lock: await self._backfill( source, limit, most_recent.ktid if most_recent else None, channel_info=channel_info, ) async def _backfill( self, source: u.User, limit: int | None, after_log_id: Long | None, channel_info: ChannelInfo, ) -> None: self.log.debug(f"Backfilling history through {source.mxid}") self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}") messages = await source.client.get_chats( self.channel_props, after_log_id, limit, ) if not messages: self.log.debug("Didn't get any messages from server") return self.log.debug(f"Got {len(messages)} message{'s' if len(messages) > 1 else ''} from server") self._backfill_leave = set() async with NotificationDisabler(self.mxid, source): for message in messages: puppet = await p.Puppet.get_by_ktid(message.sender.userId) await self.handle_remote_message(source, puppet, message) for intent in self._backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) self.log.info("Backfilled %d messages through %s", len(messages), source.mxid) # region Database getters async def postinit(self) -> None: self.by_ktid[self._ktid_full] = self if self.mxid: self.by_mxid[self.mxid] = self if not self.is_direct: self._main_intent = self.az.intent elif self.mxid: # TODO Save kt_sender in DB instead? Depends on if DM channels are shared... user = await u.User.get_by_ktid(self.kt_receiver) assert user, f"Found no user for this portal's receiver of {self.kt_receiver}" if user.is_connected: await self._update_participants(user) else: self.log.debug(f"Not setting _main_intent of new direct chat for disconnected user {user.ktid}") else: self.log.debug("Not setting _main_intent of new direct chat until after checking participant list") @classmethod @async_getter_lock async def get_by_mxid(cls, mxid: RoomID) -> Portal | None: try: return cls.by_mxid[mxid] except KeyError: pass portal = cast(cls, await super().get_by_mxid(mxid)) if portal: await portal.postinit() return portal return None @classmethod @async_getter_lock async def get_by_ktid( cls, ktid: int, *, kt_receiver: int = 0, create: bool = True, kt_type: ChannelType | None = None, ) -> Portal | None: # TODO Find out if direct channels are shared. If so, don't need kt_receiver! if kt_type: kt_receiver = kt_receiver if KnownChannelType.is_direct(kt_type) else 0 ktid_full = (ktid, kt_receiver) try: return cls.by_ktid[ktid_full] except KeyError: pass portal = cast(cls, await super().get_by_ktid(ktid, kt_receiver)) if portal: await portal.postinit() return portal if kt_type and create: portal = cls(ktid=ktid, kt_receiver=kt_receiver, kt_type=kt_type) await portal.insert() await portal.postinit() return portal return None @classmethod async def get_all_by_receiver(cls, kt_receiver: int) -> AsyncGenerator[Portal, None]: portals = await super().get_all_by_receiver(kt_receiver) portal: Portal for portal in portals: try: yield cls.by_ktid[(portal.ktid, portal.kt_receiver)] except KeyError: await portal.postinit() yield portal @classmethod async def all(cls) -> AsyncGenerator[Portal, None]: portals = await super().all() portal: Portal for portal in portals: try: yield cls.by_ktid[(portal.ktid, portal.kt_receiver)] except KeyError: await portal.postinit() yield portal # endregion