Andrew Ferrazzutti
9a82db2257
Also rename "chat" to "channel" for the Message DB, and make its primary key include channel IDs
1964 lines
74 KiB
Python
1964 lines
74 KiB
Python
# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
|
|
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
AsyncGenerator,
|
|
Awaitable,
|
|
Callable,
|
|
Coroutine,
|
|
Generic,
|
|
Pattern,
|
|
TypeVar,
|
|
cast,
|
|
)
|
|
from io import BytesIO
|
|
from mimetypes import guess_extension
|
|
import asyncio
|
|
import re
|
|
import time
|
|
|
|
from attr import dataclass, evolve
|
|
|
|
from mautrix.appservice import IntentAPI
|
|
from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock
|
|
from mautrix.errors import MatrixError, MForbidden, MNotFound, SessionNotFound
|
|
from mautrix.types import (
|
|
AudioInfo,
|
|
ContentURI,
|
|
EncryptedFile,
|
|
EventID,
|
|
EventType,
|
|
FileInfo,
|
|
ImageInfo,
|
|
LocationMessageEventContent,
|
|
MediaInfo,
|
|
MediaMessageEventContent,
|
|
Membership,
|
|
MessageEventContent,
|
|
MessageType,
|
|
PowerLevelStateEventContent,
|
|
RoomAvatarStateEventContent,
|
|
RoomNameStateEventContent,
|
|
RoomTopicStateEventContent,
|
|
RelationType,
|
|
RoomCreatePreset,
|
|
RoomID,
|
|
StateEvent,
|
|
StateEventContent,
|
|
TextMessageEventContent,
|
|
UserID,
|
|
VideoInfo,
|
|
)
|
|
from mautrix.util import ffmpeg, magic
|
|
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
|
|
from mautrix.util.simple_lock import SimpleLock
|
|
|
|
from . import matrix as m, puppet as p, user as u
|
|
from .config import Config
|
|
from .db import (
|
|
Message as DBMessage,
|
|
Portal as DBPortal,
|
|
)
|
|
from .formatter import kakaotalk_to_matrix, matrix_to_kakaotalk
|
|
|
|
from .kt.types.bson import Long
|
|
from .kt.types.channel.channel_info import ChannelInfo
|
|
from .kt.types.channel.channel_type import KnownChannelType, ChannelType
|
|
from .kt.types.chat import Chatlog, ChatType, KnownChatType
|
|
from .kt.types.chat.attachment import (
|
|
Attachment,
|
|
AudioAttachment,
|
|
FileAttachment,
|
|
MediaAttachment,
|
|
MultiPhotoAttachment,
|
|
PhotoAttachment,
|
|
ReplyAttachment,
|
|
VideoAttachment,
|
|
)
|
|
from .kt.types.openlink.open_link_type import OpenChannelUserPerm
|
|
from .kt.types.user.channel_user_info import OpenChannelUserInfo
|
|
|
|
from .kt.client.types import (
|
|
UserInfoUnion,
|
|
PortalChannelInfo,
|
|
PortalChannelParticipantInfo,
|
|
ChannelProps,
|
|
TO_MSGTYPE_MAP,
|
|
FROM_PERM_MAP,
|
|
TO_PERM_MAP,
|
|
)
|
|
from .kt.client.errors import CommandException
|
|
|
|
if TYPE_CHECKING:
|
|
from .__main__ import KakaoTalkBridge
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
Image = None
|
|
|
|
try:
|
|
from mautrix.crypto.attachments import decrypt_attachment, encrypt_attachment
|
|
except ImportError:
|
|
decrypt_attachment = encrypt_attachment = None
|
|
|
|
geo_uri_regex: Pattern = re.compile(r"^geo:(-?\d+.\d+),(-?\d+.\d+)$")
|
|
|
|
|
|
class FakeLock:
|
|
async def __aenter__(self) -> None:
|
|
pass
|
|
|
|
async def __aexit__(self, exc_type, exc, tb) -> None:
|
|
pass
|
|
|
|
|
|
T = TypeVar("T")
|
|
ACallable = Coroutine[None, None, T]
|
|
|
|
StateEventHandlerContentType = TypeVar("StateEventHandlerContentType", bound=StateEventContent)
|
|
|
|
@dataclass
|
|
class StateEventHandler(Generic[StateEventHandlerContentType]):
|
|
apply: Callable[[Portal, u.User, StateEventHandlerContentType, StateEventHandlerContentType], ACallable[None]]
|
|
revert: Callable[[Portal, StateEventHandlerContentType], ACallable[None]]
|
|
action_name: str
|
|
|
|
|
|
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
|
|
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
|
|
|
|
|
|
class Portal(DBPortal, BasePortal):
|
|
invite_own_puppet_to_pm: bool = False
|
|
by_mxid: dict[RoomID, Portal] = {}
|
|
by_ktid: dict[tuple[int, int], Portal] = {}
|
|
matrix: m.MatrixHandler
|
|
config: Config
|
|
|
|
_main_intent: IntentAPI | None
|
|
_kt_sender: Long | None
|
|
_create_room_lock: asyncio.Lock
|
|
_send_locks: dict[int, asyncio.Lock]
|
|
_noop_lock: FakeLock = FakeLock()
|
|
backfill_lock: SimpleLock
|
|
_backfill_leave: set[IntentAPI] | None
|
|
_sleeping_to_resync: bool
|
|
_scheduled_resync: asyncio.Task | None
|
|
_resync_targets: dict[int, p.Puppet]
|
|
|
|
_CHAT_TYPE_HANDLER_MAP: dict[ChatType, Callable[..., ACallable[list[EventID]]]]
|
|
_STATE_EVENT_HANDLER_MAP: dict[EventType, StateEventHandler]
|
|
|
|
def __init__(
|
|
self,
|
|
ktid: Long,
|
|
kt_receiver: Long,
|
|
kt_type: ChannelType,
|
|
mxid: RoomID | None = None,
|
|
name: str | None = None,
|
|
description: str | None = None,
|
|
photo_id: str | None = None,
|
|
avatar_url: ContentURI | None = None,
|
|
encrypted: bool = False,
|
|
name_set: bool = False,
|
|
topic_set: bool = False,
|
|
avatar_set: bool = False,
|
|
fully_read_kt_chat: Long | None = None,
|
|
relay_user_id: UserID | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
ktid,
|
|
kt_receiver,
|
|
kt_type,
|
|
mxid,
|
|
name,
|
|
description,
|
|
photo_id,
|
|
avatar_url,
|
|
encrypted,
|
|
name_set,
|
|
topic_set,
|
|
avatar_set,
|
|
fully_read_kt_chat,
|
|
relay_user_id,
|
|
)
|
|
self.log = self.log.getChild(self.ktid_log)
|
|
|
|
self._main_intent = None
|
|
self._kt_sender = None
|
|
self._create_room_lock = asyncio.Lock()
|
|
self._send_locks = {}
|
|
self._sleeping_to_resync = False
|
|
self._scheduled_resync = None
|
|
self._resync_targets = {}
|
|
|
|
self.backfill_lock = SimpleLock(
|
|
"Waiting for backfilling to finish before handling %s", log=self.log
|
|
)
|
|
self._backfill_leave = None
|
|
|
|
self._relay_user = None
|
|
|
|
@classmethod
|
|
def init_cls(cls, bridge: KakaoTalkBridge) -> None:
|
|
BasePortal.bridge = bridge
|
|
cls.az = bridge.az
|
|
cls.config = bridge.config
|
|
cls.loop = bridge.loop
|
|
cls.matrix = bridge.matrix
|
|
cls.invite_own_puppet_to_pm = cls.config["bridge.invite_own_puppet_to_pm"]
|
|
NotificationDisabler.puppet_cls = p.Puppet
|
|
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
|
|
|
|
cls._CHAT_TYPE_HANDLER_MAP = {
|
|
KnownChatType.FEED: cls._handle_kakaotalk_feed,
|
|
KnownChatType.TEXT: cls._handle_kakaotalk_text,
|
|
KnownChatType.REPLY: cls._handle_kakaotalk_reply,
|
|
KnownChatType.PHOTO: cls._handle_kakaotalk_photo,
|
|
KnownChatType.MULTIPHOTO: cls._handle_kakaotalk_multiphoto,
|
|
KnownChatType.VIDEO: cls._handle_kakaotalk_video,
|
|
KnownChatType.AUDIO: cls._handle_kakaotalk_audio,
|
|
KnownChatType.FILE: cls._handle_kakaotalk_file,
|
|
16385: cls._handle_kakaotalk_deleted,
|
|
}
|
|
|
|
cls._STATE_EVENT_HANDLER_MAP = {
|
|
EventType.ROOM_POWER_LEVELS: StateEventHandler(
|
|
cls._handle_matrix_power_levels,
|
|
cls._revert_matrix_power_levels,
|
|
"power level"
|
|
),
|
|
EventType.ROOM_NAME: StateEventHandler(
|
|
cls._handle_matrix_room_name,
|
|
cls._revert_matrix_room_name,
|
|
"room name"
|
|
),
|
|
EventType.ROOM_TOPIC: StateEventHandler(
|
|
cls._handle_matrix_room_topic,
|
|
cls._revert_matrix_room_topic,
|
|
"room topic"
|
|
),
|
|
EventType.ROOM_AVATAR: StateEventHandler(
|
|
cls._handle_matrix_room_avatar,
|
|
cls._revert_matrix_room_avatar,
|
|
"room avatar"
|
|
),
|
|
}
|
|
|
|
@classmethod
|
|
def supports_state_event(cls, evt_type: EventType) -> bool:
|
|
return evt_type in cls._STATE_EVENT_HANDLER_MAP
|
|
|
|
# region DB conversion
|
|
|
|
async def delete(self) -> None:
|
|
if self.mxid:
|
|
await DBMessage.delete_all_by_room(self.mxid)
|
|
self.by_mxid.pop(self.mxid, None)
|
|
self.by_ktid.pop(self.ktid_full, None)
|
|
self.mxid = None
|
|
self.name_set = False
|
|
self.avatar_set = False
|
|
self.relay_user_id = None
|
|
self.encrypted = False
|
|
await super().save()
|
|
|
|
# endregion
|
|
# region Properties
|
|
|
|
@property
|
|
def ktid_full(self) -> tuple[Long, Long]:
|
|
return self.ktid, self.kt_receiver
|
|
|
|
@property
|
|
def ktid_log(self) -> str:
|
|
if self.is_direct:
|
|
return f"{self.ktid}->{self.kt_receiver}"
|
|
return str(self.ktid)
|
|
|
|
@property
|
|
def is_direct(self) -> bool:
|
|
return KnownChannelType.is_direct(self.kt_type)
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return KnownChannelType.is_open(self.kt_type)
|
|
|
|
@property
|
|
def kt_sender(self) -> int | None:
|
|
if self.is_direct:
|
|
if not self._kt_sender:
|
|
raise ValueError("Direct chat portal must set sender")
|
|
else:
|
|
if self._kt_sender:
|
|
raise ValueError(f"Non-direct chat portal should have no sender, but has sender {self._kt_sender}")
|
|
return self._kt_sender
|
|
|
|
@property
|
|
def channel_props(self) -> ChannelProps:
|
|
return ChannelProps(
|
|
id=self.ktid,
|
|
type=self.kt_type
|
|
)
|
|
|
|
@property
|
|
def main_intent(self) -> IntentAPI:
|
|
if not self._main_intent:
|
|
raise ValueError("Portal must be postinit()ed before main_intent can be used")
|
|
return self._main_intent
|
|
|
|
async def get_dm_puppet(self) -> p.Puppet | None:
|
|
if not self.is_direct:
|
|
return None
|
|
return await p.Puppet.get_by_ktid(self.kt_sender)
|
|
|
|
# endregion
|
|
# region Chat info updating
|
|
|
|
def schedule_resync(self, source: u.User, target: p.Puppet) -> None:
|
|
self._resync_targets[target.ktid] = target
|
|
if (
|
|
self._sleeping_to_resync
|
|
and self._scheduled_resync
|
|
and not self._scheduled_resync.done()
|
|
):
|
|
return
|
|
self._sleeping_to_resync = True
|
|
self.log.debug(f"Scheduling resync through {source.mxid}/{source.ktid}")
|
|
self._scheduled_resync = asyncio.create_task(self._sleep_and_resync(source, 10))
|
|
|
|
async def _sleep_and_resync(self, source: u.User, sleep: int) -> None:
|
|
await asyncio.sleep(sleep)
|
|
targets = self._resync_targets
|
|
self._sleeping_to_resync = False
|
|
self._resync_targets = {}
|
|
for puppet in targets.values():
|
|
if not puppet.name or not puppet.name_set:
|
|
break
|
|
else:
|
|
self.log.debug(
|
|
f"Cancelled resync through {source.mxid}/{source.ktid}, all puppets have names"
|
|
)
|
|
return
|
|
self.log.debug(f"Resyncing chat through {source.mxid}/{source.ktid} after sleeping")
|
|
await self.update_info(source)
|
|
self._scheduled_resync = None
|
|
self.log.debug(f"Completed scheduled resync through {source.mxid}/{source.ktid}")
|
|
|
|
async def update_info(
|
|
self,
|
|
source: u.User,
|
|
info: PortalChannelInfo | None = None,
|
|
force_save: bool = False,
|
|
) -> PortalChannelInfo:
|
|
if not info:
|
|
self.log.debug("Called update_info with no info, fetching it now...")
|
|
info = await source.client.get_portal_channel_info(self.channel_props)
|
|
changed = False
|
|
if not self.is_direct:
|
|
changed = any(
|
|
await asyncio.gather(
|
|
self._update_name(info.name),
|
|
self._update_description(info.description),
|
|
self._update_photo(source, info.photoURL),
|
|
)
|
|
)
|
|
if info.participantInfo:
|
|
changed = await self._update_participants(source, info.participantInfo) or changed
|
|
if changed or force_save:
|
|
await self.update_bridge_info()
|
|
await self.save()
|
|
return info
|
|
|
|
async def _get_mapped_participant_power_levels(self, participants: list[UserInfoUnion]) -> dict[UserID, int]:
|
|
user_power_levels: dict[UserID, int] = {}
|
|
for participant in participants:
|
|
if not isinstance(participant, OpenChannelUserInfo):
|
|
self.log.warning(f"Info object for participant {participant.userId} of open channel is not an OpenChannelUserInfo")
|
|
continue
|
|
await self._update_mapped_ktid_power_levels(user_power_levels, participant.userId, participant.perm)
|
|
return user_power_levels
|
|
|
|
@staticmethod
|
|
async def _update_mapped_ktid_power_levels(user_power_levels: dict[UserID, int], ktid: int, perm: OpenChannelUserPerm) -> None:
|
|
power_level = FROM_PERM_MAP[perm]
|
|
user = await u.User.get_by_ktid(ktid)
|
|
if user:
|
|
user_power_levels[user.mxid] = power_level
|
|
puppet = await p.Puppet.get_by_ktid(ktid)
|
|
if puppet:
|
|
user_power_levels[puppet.mxid] = power_level
|
|
|
|
async def _set_user_power_levels(self, sender: p.Puppet | None, user_power_levels: dict[UserID, int]) -> None:
|
|
if not self.mxid:
|
|
return
|
|
orig_power_levels = await self.main_intent.get_power_levels(self.mxid)
|
|
user_power_levels = {k: v for k, v in user_power_levels.items() if orig_power_levels.get_user_level(k) != v}
|
|
if not user_power_levels:
|
|
return
|
|
joined_puppets = {
|
|
puppet.mxid: puppet for puppet in [
|
|
await p.Puppet.get_by_custom_mxid(mxid) or await p.Puppet.get_by_mxid(mxid)
|
|
for mxid in await self.main_intent.get_room_members(self.mxid)
|
|
] if puppet
|
|
}
|
|
sender_intent = sender.intent_for(self) if sender else self.main_intent
|
|
admin_level = orig_power_levels.get_user_level(sender_intent.mxid)
|
|
demoter_ids: list[UserID] = []
|
|
power_levels = evolve(orig_power_levels)
|
|
for user_id, new_level in user_power_levels.items():
|
|
curr_level = orig_power_levels.get_user_level(user_id)
|
|
if curr_level < admin_level or user_id == sender_intent.mxid:
|
|
# TODO Consider capping the power level here, instead of letting the attempt fail later
|
|
power_levels.set_user_level(user_id, new_level)
|
|
elif user_id in joined_puppets:
|
|
demoter_ids.append(user_id)
|
|
else:
|
|
# This is either a non-joined puppet or a non-puppet user
|
|
self.log.warning(f"Can't change power level of more powerful user {user_id}")
|
|
try:
|
|
await sender_intent.set_power_levels(self.mxid, power_levels)
|
|
except:
|
|
self.log.exception("Failed to set power level")
|
|
if demoter_ids:
|
|
power_levels = evolve(orig_power_levels)
|
|
for demoter_id in demoter_ids:
|
|
power_levels.set_user_level(demoter_id, user_power_levels[demoter_id])
|
|
try:
|
|
await joined_puppets[demoter_id].intent_for(self).set_power_levels(self.mxid, power_levels)
|
|
except:
|
|
self.log.exception("Failed to set power level")
|
|
power_levels.set_user_level(demoter_id, orig_power_levels[demoter_id])
|
|
|
|
@classmethod
|
|
async def _reupload_kakaotalk_file_from_url(
|
|
cls,
|
|
url: str,
|
|
source: u.User,
|
|
intent: IntentAPI,
|
|
*,
|
|
filename: str | None = None,
|
|
mimetype: str | None,
|
|
encrypt: bool = False,
|
|
find_size: bool = False,
|
|
convert_audio: bool = False,
|
|
) -> tuple[ContentURI, FileInfo | VideoInfo | AudioInfo | ImageInfo, EncryptedFile | None]:
|
|
if not url:
|
|
raise ValueError("URL not provided")
|
|
sandbox = cls.config["bridge.sandbox_media_download"]
|
|
# TODO Referer header?
|
|
async with source.client.get(url, sandbox=sandbox) as resp:
|
|
length = int(resp.headers["Content-Length"])
|
|
if length > cls.matrix.media_config.upload_size:
|
|
raise ValueError("File not available: too large")
|
|
data = await resp.read()
|
|
return await cls._reupload_kakaotalk_file_from_bytes(
|
|
data,
|
|
intent,
|
|
filename=filename,
|
|
mimetype=mimetype,
|
|
encrypt=encrypt,
|
|
find_size=find_size,
|
|
convert_audio=convert_audio,
|
|
)
|
|
|
|
@classmethod
|
|
async def _reupload_kakaotalk_file_from_bytes(
|
|
cls,
|
|
data: bytes,
|
|
intent: IntentAPI,
|
|
*,
|
|
filename: str | None = None,
|
|
mimetype: str | None = None,
|
|
encrypt: bool = False,
|
|
find_size: bool = False,
|
|
convert_audio: bool = False,
|
|
) -> tuple[ContentURI, FileInfo | VideoInfo | AudioInfo | ImageInfo, EncryptedFile | None]:
|
|
if not mimetype:
|
|
mimetype = magic.mimetype(data)
|
|
if convert_audio and mimetype != "audio/ogg":
|
|
data = await ffmpeg.convert_bytes(
|
|
data, ".ogg", output_args=("-c:a", "libopus"), input_mime=mimetype
|
|
)
|
|
mimetype = "audio/ogg"
|
|
info = FileInfo(mimetype=mimetype, size=len(data))
|
|
if Image and mimetype.startswith("image/") and find_size:
|
|
with Image.open(BytesIO(data)) as img:
|
|
width, height = img.size
|
|
info = ImageInfo(mimetype=mimetype, size=len(data), width=width, height=height)
|
|
upload_mime_type = mimetype
|
|
decryption_info = None
|
|
if encrypt and encrypt_attachment:
|
|
data, decryption_info = encrypt_attachment(data)
|
|
upload_mime_type = "application/octet-stream"
|
|
filename = None
|
|
url = await intent.upload_media(
|
|
data,
|
|
mime_type=upload_mime_type,
|
|
filename=filename,
|
|
async_upload=cls.config["homeserver.async_media"],
|
|
)
|
|
if decryption_info:
|
|
decryption_info.url = url
|
|
return url, info, decryption_info
|
|
|
|
async def _update_name(self, name: str | None) -> bool:
|
|
if not name:
|
|
self.log.warning("Got empty name in _update_name call")
|
|
return False
|
|
if self.name != name or not self.name_set:
|
|
self.log.trace("Updating name %s -> %s", self.name, name)
|
|
self.name = name
|
|
if self.mxid and (self.encrypted or not self.is_direct):
|
|
try:
|
|
await self.main_intent.set_room_name(self.mxid, self.name)
|
|
self.name_set = True
|
|
except Exception:
|
|
self.log.exception("Failed to set room name")
|
|
self.name_set = False
|
|
return True
|
|
return False
|
|
|
|
async def _update_description(self, description: str | None) -> bool:
|
|
if self.description != description or not self.topic_set:
|
|
self.log.trace("Updating description %s -> %s", self.description, description)
|
|
self.description = description
|
|
if self.mxid and (self.encrypted or not self.is_direct):
|
|
try:
|
|
await self.main_intent.set_room_topic(self.mxid, self.description or "")
|
|
self.topic_set = True
|
|
except Exception:
|
|
self.log.exception("Failed to set room description")
|
|
self.topic_set = False
|
|
return True
|
|
return False
|
|
|
|
async def _update_photo(self, source: u.User, photo_id: str | None) -> bool:
|
|
if self.is_direct and not self.encrypted:
|
|
return False
|
|
if self.photo_id is not None and photo_id is None:
|
|
self.log.warning("Portal previously had a photo_id, but new photo_id is None. Leaving it as it is")
|
|
return False
|
|
if self.photo_id != photo_id or not self.avatar_set:
|
|
self.photo_id = photo_id
|
|
if photo_id:
|
|
if self.photo_id != photo_id or not self.avatar_url:
|
|
# Reset avatar_url first in case the upload fails
|
|
self.avatar_url = None
|
|
self.avatar_url = await p.Puppet.reupload_avatar(
|
|
source,
|
|
self.main_intent,
|
|
photo_id,
|
|
self.ktid,
|
|
)
|
|
else:
|
|
self.avatar_url = ContentURI("")
|
|
if self.mxid:
|
|
try:
|
|
await self.main_intent.set_room_avatar(self.mxid, self.avatar_url)
|
|
self.avatar_set = True
|
|
except Exception:
|
|
self.log.exception("Failed to set room avatar")
|
|
self.avatar_set = False
|
|
return True
|
|
return False
|
|
|
|
async def _update_photo_from_puppet(self, puppet: p.Puppet) -> bool:
|
|
if self.photo_id == puppet.photo_id and self.avatar_set:
|
|
return False
|
|
self.photo_id = puppet.photo_id
|
|
if puppet.photo_mxc:
|
|
self.avatar_url = puppet.photo_mxc
|
|
elif self.photo_id:
|
|
profile = await self.main_intent.get_profile(puppet.default_mxid)
|
|
self.avatar_url = profile.avatar_url
|
|
puppet.photo_mxc = profile.avatar_url
|
|
else:
|
|
self.avatar_url = ContentURI("")
|
|
if self.mxid:
|
|
try:
|
|
await self.main_intent.set_room_avatar(self.mxid, self.avatar_url)
|
|
self.avatar_set = True
|
|
except Exception:
|
|
self.log.exception("Failed to set room avatar")
|
|
self.avatar_set = False
|
|
return True
|
|
|
|
async def update_info_from_puppet(self, puppet: p.Puppet | None = None) -> bool:
|
|
if not self.is_direct:
|
|
return False
|
|
if not puppet:
|
|
puppet = await self.get_dm_puppet()
|
|
if not puppet:
|
|
return False
|
|
changed = await self._update_name(puppet.name)
|
|
changed = await self._update_photo_from_puppet(puppet) or changed
|
|
return changed
|
|
|
|
"""
|
|
async def sync_per_room_nick(self, puppet: p.Puppet, name: str) -> None:
|
|
intent = puppet.intent_for(self)
|
|
content = MemberStateEventContent(
|
|
membership=Membership.JOIN,
|
|
avatar_url=puppet.photo_mxc,
|
|
displayname=name or puppet.name,
|
|
)
|
|
content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
|
|
current_state = await intent.state_store.get_member(self.mxid, intent.mxid)
|
|
if not current_state or current_state.displayname != content.displayname:
|
|
self.log.debug(
|
|
"Syncing %s's per-room nick %s to the room",
|
|
puppet.ktid,
|
|
content.displayname,
|
|
)
|
|
await intent.send_state_event(
|
|
self.mxid, EventType.ROOM_MEMBER, content, state_key=intent.mxid
|
|
)
|
|
"""
|
|
|
|
async def _update_participant(
|
|
self, source: u.User, participant: UserInfoUnion
|
|
) -> bool:
|
|
# TODO nick map?
|
|
self.log.trace(f"Syncing participant {participant.userId}")
|
|
puppet = await p.Puppet.get_by_ktid(participant.userId)
|
|
await puppet.update_info_from_participant(source, participant)
|
|
changed = False
|
|
if self.is_direct and self._kt_sender == puppet.ktid and self.encrypted:
|
|
changed = await self.update_info_from_puppet(puppet.name) or changed
|
|
if self.mxid:
|
|
if puppet.ktid != self.kt_receiver or puppet.is_real_user:
|
|
await puppet.intent_for(self).ensure_joined(self.mxid, bot=self.main_intent)
|
|
#if puppet.ktid in nick_map:
|
|
# await self.sync_per_room_nick(puppet, nick_map[puppet.ktid])
|
|
return changed
|
|
|
|
async def _update_participants(
|
|
self,
|
|
source: u.User,
|
|
participant_info: PortalChannelParticipantInfo | None = None,
|
|
) -> bool:
|
|
# NOTE This handles only non-logged-in users, because logged-in users should be handled by the channel list listeners
|
|
# TODO nick map?
|
|
if participant_info is None:
|
|
self.log.debug("Called _update_participants with no participant info, fetching it now...")
|
|
participant_info = await source.client.get_portal_channel_participant_info(self.channel_props)
|
|
if self.mxid:
|
|
# NOTE KakaoTalk kick = Matrix ban
|
|
prev_banned_mxids = {
|
|
cast(UserID, event.state_key)
|
|
for event in await self.main_intent.get_members(self.mxid, membership=Membership.BAN)
|
|
}
|
|
results = await asyncio.gather(*[
|
|
self.handle_kakaotalk_user_left(source, None, puppet)
|
|
for puppet in [
|
|
await p.Puppet.get_by_ktid(ktid)
|
|
for ktid in participant_info.kickedUserIds
|
|
]
|
|
if puppet and puppet.mxid not in prev_banned_mxids
|
|
], return_exceptions=True)
|
|
for e in filter(lambda x: isinstance(x, Exception), results):
|
|
self.log.exception(e)
|
|
|
|
joined_ktids = {pcp.userId for pcp in participant_info.participants}
|
|
results = await asyncio.gather(*[
|
|
self.handle_kakaotalk_user_left(source, puppet, puppet)
|
|
for puppet in [
|
|
await p.Puppet.get_by_mxid(mxid)
|
|
for mxid in await self.main_intent.get_room_members(self.mxid)
|
|
]
|
|
if puppet and puppet.ktid not in joined_ktids
|
|
], return_exceptions=True)
|
|
for e in filter(lambda x: isinstance(x, Exception), results):
|
|
self.log.exception(e)
|
|
|
|
kicked_ktids = set(participant_info.kickedUserIds)
|
|
results = await asyncio.gather(*[
|
|
self.handle_kakaotalk_user_unkick(source, None, puppet)
|
|
for puppet in [
|
|
await p.Puppet.get_by_mxid(mxid)
|
|
for mxid in prev_banned_mxids
|
|
]
|
|
if puppet and puppet.ktid not in kicked_ktids
|
|
], return_exceptions=True)
|
|
for e in filter(lambda x: isinstance(x, Exception), results):
|
|
self.log.exception(e)
|
|
|
|
changed = False
|
|
results = await asyncio.gather(*[
|
|
self._update_participant(source, pcp) for pcp in participant_info.participants
|
|
], return_exceptions=True)
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
self.log.exception(result)
|
|
else:
|
|
changed = result or changed
|
|
|
|
if self.mxid and self.is_open:
|
|
# TODO Find whether perms apply to any non-direct channel, or just open ones
|
|
user_power_levels = await self._get_mapped_participant_power_levels(participant_info.participants)
|
|
await self._set_user_power_levels(None, user_power_levels)
|
|
|
|
return changed
|
|
|
|
# endregion
|
|
# region Matrix room creation
|
|
|
|
async def update_matrix_room(self, source: u.User, info: PortalChannelInfo | None = None) -> None:
|
|
try:
|
|
await self._update_matrix_room(source, info)
|
|
except Exception:
|
|
self.log.exception("Failed to update portal")
|
|
|
|
def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, Any]:
|
|
invite_content = {}
|
|
if double_puppet:
|
|
invite_content["fi.mau.will_auto_accept"] = True
|
|
if self.is_direct:
|
|
invite_content["is_direct"] = True
|
|
return invite_content
|
|
|
|
async def _update_matrix_room(
|
|
self, source: u.User, info: PortalChannelInfo | None = None
|
|
) -> None:
|
|
puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
|
|
await self.main_intent.invite_user(
|
|
self.mxid,
|
|
source.mxid,
|
|
check_cache=True,
|
|
extra_content=self._get_invite_content(puppet),
|
|
)
|
|
if puppet:
|
|
did_join = await puppet.intent.ensure_joined(self.mxid)
|
|
if did_join and self.is_direct:
|
|
await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
|
|
info = await self.update_info(source, info)
|
|
|
|
await self._sync_read_receipts(source)
|
|
|
|
async def _sync_read_receipts(self, source: u.User) -> None:
|
|
messages = await DBMessage.get_all_since(self.ktid, self.kt_receiver, self.fully_read_kt_chat)
|
|
receipts = await source.client.get_read_receipts(
|
|
self.channel_props,
|
|
[m.ktid for m in messages if m.ktid]
|
|
)
|
|
if not receipts:
|
|
return
|
|
for receipt in receipts:
|
|
message = await DBMessage.get_closest_before(
|
|
self.ktid, self.kt_receiver, receipt.chatId
|
|
)
|
|
if not message:
|
|
continue
|
|
puppet = await p.Puppet.get_by_ktid(receipt.userId, create=False)
|
|
if not puppet:
|
|
continue
|
|
try:
|
|
await puppet.intent_for(self).mark_read(message.mx_room, message.mxid)
|
|
except Exception:
|
|
self.log.warning(
|
|
f"Failed to mark {message.mxid} in {message.mx_room} "
|
|
f"as read by {puppet.intent.mxid}",
|
|
exc_info=True,
|
|
)
|
|
fully_read_kt_chat = min(receipt.chatId for receipt in receipts)
|
|
if not self.fully_read_kt_chat or self.fully_read_kt_chat < fully_read_kt_chat:
|
|
self.fully_read_kt_chat = fully_read_kt_chat
|
|
await self.save()
|
|
|
|
async def create_matrix_room(
|
|
self, source: u.User, info: PortalChannelInfo | None = None
|
|
) -> RoomID | None:
|
|
if self.mxid:
|
|
try:
|
|
await self._update_matrix_room(source, info)
|
|
except Exception:
|
|
self.log.exception("Failed to update portal")
|
|
return self.mxid
|
|
async with self._create_room_lock:
|
|
try:
|
|
return await self._create_matrix_room(source, info)
|
|
except Exception:
|
|
self.log.exception("Failed to create portal")
|
|
return None
|
|
|
|
@property
|
|
def bridge_info_state_key(self) -> str:
|
|
return f"net.miscworks.kakaotalk://kakaotalk/{self.ktid}"
|
|
|
|
@property
|
|
def bridge_info(self) -> dict[str, Any]:
|
|
return {
|
|
"bridgebot": self.az.bot_mxid,
|
|
"creator": self.main_intent.mxid,
|
|
"protocol": {
|
|
"id": "kakaotalk",
|
|
"displayname": "KakaoTalk",
|
|
"avatar_url": self.config["appservice.bot_avatar"],
|
|
},
|
|
"channel": {
|
|
"id": str(self.ktid),
|
|
"displayname": self.name,
|
|
"avatar_url": self.avatar_url,
|
|
},
|
|
}
|
|
|
|
async def update_bridge_info(self) -> None:
|
|
if not self.mxid:
|
|
self.log.debug("Not updating bridge info: no Matrix room created")
|
|
return
|
|
try:
|
|
self.log.debug("Updating bridge info...")
|
|
await self.main_intent.send_state_event(
|
|
self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key
|
|
)
|
|
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
|
|
await self.main_intent.send_state_event(
|
|
self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key
|
|
)
|
|
except Exception:
|
|
self.log.warning("Failed to update bridge info", exc_info=True)
|
|
|
|
async def _create_matrix_room(
|
|
self, source: u.User, info: PortalChannelInfo | None = None
|
|
) -> RoomID:
|
|
if self.mxid:
|
|
await self._update_matrix_room(source, info=info)
|
|
return self.mxid
|
|
|
|
self.log.debug(f"Creating Matrix room")
|
|
|
|
if self.is_direct:
|
|
# NOTE Must do this to find the other member of the DM, since the channel ID != the member's ID!
|
|
if not info or not info.participantInfo:
|
|
info = await source.client.get_portal_channel_info(self.channel_props)
|
|
assert info.participantInfo
|
|
await self._update_participants(source, info.participantInfo)
|
|
|
|
name: str | None = None
|
|
description: str | None = None
|
|
initial_state = [
|
|
{
|
|
"type": str(StateBridge),
|
|
"state_key": self.bridge_info_state_key,
|
|
"content": self.bridge_info,
|
|
},
|
|
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
|
|
{
|
|
"type": str(StateHalfShotBridge),
|
|
"state_key": self.bridge_info_state_key,
|
|
"content": self.bridge_info,
|
|
},
|
|
]
|
|
|
|
if self.is_open:
|
|
preset = RoomCreatePreset.PUBLIC
|
|
# TODO Find whether perms apply to any non-direct channel, or just open ones
|
|
if not info or not info.participantInfo:
|
|
info = await source.client.get_portal_channel_info(self.channel_props)
|
|
assert info.participantInfo
|
|
user_power_levels = await self._get_mapped_participant_power_levels(info.participantInfo.participants)
|
|
# NOTE Giving the bot a +1 power level if necessary so it can demote non-puppet admins
|
|
user_power_levels[self.main_intent.mxid] = max(100, 1 + FROM_PERM_MAP[OpenChannelUserPerm.OWNER])
|
|
power_level_override = PowerLevelStateEventContent(users=user_power_levels)
|
|
else:
|
|
preset = RoomCreatePreset.PRIVATE
|
|
power_level_override = None
|
|
|
|
invites = []
|
|
if self.config["bridge.encryption.default"] and self.matrix.e2ee:
|
|
self.encrypted = True
|
|
initial_state.append(
|
|
{
|
|
"type": "m.room.encryption",
|
|
"content": {"algorithm": "m.megolm.v1.aes-sha2"},
|
|
}
|
|
)
|
|
if self.is_direct:
|
|
invites.append(self.az.bot_mxid)
|
|
|
|
info = await self.update_info(source=source, info=info)
|
|
|
|
if self.encrypted or not self.is_direct:
|
|
name = self.name
|
|
description = self.description
|
|
initial_state.append(
|
|
{
|
|
"type": str(EventType.ROOM_AVATAR),
|
|
"content": {"url": self.avatar_url},
|
|
}
|
|
)
|
|
|
|
# We lock backfill lock here so any messages that come between the room being created
|
|
# and the initial backfill finishing wouldn't be bridged before the backfill messages.
|
|
with self.backfill_lock:
|
|
creation_content = {}
|
|
if not self.config["bridge.federate_rooms"]:
|
|
creation_content["m.federate"] = False
|
|
self.mxid = await self.main_intent.create_room(
|
|
preset=preset,
|
|
name=name,
|
|
topic=description,
|
|
is_direct=self.is_direct,
|
|
initial_state=initial_state,
|
|
invitees=invites,
|
|
creation_content=creation_content,
|
|
power_level_override=power_level_override,
|
|
)
|
|
if not self.mxid:
|
|
raise Exception("Failed to create room: no mxid returned")
|
|
|
|
if self.encrypted and self.matrix.e2ee and self.is_direct:
|
|
try:
|
|
await self.az.intent.ensure_joined(self.mxid)
|
|
except Exception:
|
|
self.log.warning(f"Failed to add bridge bot to new private chat {self.mxid}")
|
|
|
|
await self.save()
|
|
self.log.debug(f"Matrix room created: {self.mxid}")
|
|
self.by_mxid[self.mxid] = self
|
|
|
|
puppet = await p.Puppet.get_by_custom_mxid(source.mxid)
|
|
await self.main_intent.invite_user(
|
|
self.mxid, source.mxid, extra_content=self._get_invite_content(puppet)
|
|
)
|
|
if puppet:
|
|
try:
|
|
if self.is_direct:
|
|
await source.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
await puppet.intent.join_room_by_id(self.mxid)
|
|
except MatrixError:
|
|
self.log.debug(
|
|
"Failed to join custom puppet into newly created portal",
|
|
exc_info=True,
|
|
)
|
|
|
|
if not self.is_direct:
|
|
# NOTE Calling this after room creation to invite participants
|
|
await self._update_participants(source, info.participantInfo)
|
|
|
|
if info.channel_info:
|
|
try:
|
|
await self.backfill(source, is_initial=True, channel_info=info.channel_info)
|
|
# NOTE This also syncs read receipts
|
|
except Exception:
|
|
self.log.exception("Failed to backfill new portal")
|
|
|
|
return self.mxid
|
|
|
|
# endregion
|
|
# region Matrix event handling
|
|
|
|
def require_send_lock(self, user_id: int) -> asyncio.Lock:
|
|
try:
|
|
lock = self._send_locks[user_id]
|
|
except KeyError:
|
|
lock = asyncio.Lock()
|
|
self._send_locks[user_id] = lock
|
|
return lock
|
|
|
|
def optional_send_lock(self, user_id: int) -> asyncio.Lock | FakeLock:
|
|
try:
|
|
return self._send_locks[user_id]
|
|
except KeyError:
|
|
pass
|
|
return self._noop_lock
|
|
|
|
async def _send_delivery_receipt(self, event_id: EventID) -> None:
|
|
if event_id and self.config["bridge.delivery_receipts"]:
|
|
try:
|
|
await self.az.intent.mark_read(self.mxid, event_id)
|
|
except Exception:
|
|
self.log.exception(f"Failed to send delivery receipt for {event_id}")
|
|
|
|
async def _send_bridge_error(self, msg: str, thing: str = "message") -> None:
|
|
await self._send_message(
|
|
self.main_intent,
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
body=f"\u26a0 Your {thing} may not have been bridged: {msg}",
|
|
),
|
|
)
|
|
|
|
def _status_from_exception(self, e: Exception) -> MessageSendCheckpointStatus:
|
|
if isinstance(e, NotImplementedError):
|
|
return MessageSendCheckpointStatus.UNSUPPORTED
|
|
return MessageSendCheckpointStatus.PERM_FAILURE
|
|
|
|
async def handle_matrix_message(
|
|
self, sender: u.User, message: MessageEventContent, event_id: EventID
|
|
) -> None:
|
|
try:
|
|
await self._handle_matrix_message(sender, message, event_id)
|
|
except Exception as e:
|
|
self.log.exception(f"Failed to handle Matrix event {event_id}: {e}")
|
|
sender.send_remote_checkpoint(
|
|
self._status_from_exception(e),
|
|
event_id,
|
|
self.mxid,
|
|
EventType.ROOM_MESSAGE,
|
|
message.msgtype,
|
|
error=e,
|
|
)
|
|
await self._send_bridge_error(str(e))
|
|
else:
|
|
await self._send_delivery_receipt(event_id)
|
|
|
|
async def _handle_matrix_message(
|
|
self, orig_sender: u.User, message: MessageEventContent, event_id: EventID
|
|
) -> None:
|
|
if message.get_edit():
|
|
raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.")
|
|
if message.relates_to.rel_type == RelationType.REPLY and not message.msgtype.is_text:
|
|
raise NotImplementedError("Replying with non-text content is not supported by the KakaoTalk bridge.")
|
|
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
|
|
if not sender:
|
|
raise Exception("not logged in")
|
|
elif not sender.is_connected:
|
|
raise Exception("not connected to KakaoTalk chats")
|
|
elif is_relay:
|
|
if not message.msgtype.is_text:
|
|
intro_message = TextMessageEventContent(msgtype=MessageType.TEXT, body=message.body)
|
|
await self.apply_relay_message_format(orig_sender, intro_message)
|
|
await self._send_chat(sender, intro_message)
|
|
else:
|
|
await self.apply_relay_message_format(orig_sender, message)
|
|
if message.msgtype == MessageType.TEXT or message.msgtype == MessageType.NOTICE:
|
|
await self._handle_matrix_text(event_id, sender, message)
|
|
elif message.msgtype.is_media:
|
|
await self._handle_matrix_media(event_id, sender, message)
|
|
# elif message.msgtype == MessageType.LOCATION:
|
|
# await self._handle_matrix_location(sender, message)
|
|
else:
|
|
raise NotImplementedError(f"Unsupported message type {message.msgtype}")
|
|
|
|
async def _send_chat(
|
|
self, sender: u.User, message: TextMessageEventContent, event_id: EventID | None = None
|
|
) -> Long:
|
|
converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self)
|
|
try:
|
|
return await sender.client.send_chat(
|
|
self.channel_props,
|
|
text=converted.text,
|
|
reply_to=converted.reply_to,
|
|
mentions=converted.mentions,
|
|
)
|
|
except CommandException as e:
|
|
self.log.debug(f"Error handling Matrix message {event_id if event_id else '<extra>'}: {e!s}")
|
|
raise
|
|
|
|
async def _make_dbm(self, event_id: EventID, ktid: Long | None = None) -> DBMessage:
|
|
dbm = DBMessage(
|
|
mxid=event_id,
|
|
mx_room=self.mxid,
|
|
ktid=ktid,
|
|
index=0,
|
|
kt_channel=self.ktid,
|
|
kt_receiver=self.kt_receiver,
|
|
timestamp=int(time.time() * 1000),
|
|
)
|
|
await dbm.insert()
|
|
return dbm
|
|
|
|
async def _handle_matrix_text(
|
|
self, event_id: EventID, sender: u.User, message: TextMessageEventContent
|
|
) -> None:
|
|
log_id = await self._send_chat(sender, message, event_id)
|
|
await self._make_dbm(event_id, log_id)
|
|
self.log.debug(f"Handled Matrix message {event_id} -> {log_id}")
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
event_id,
|
|
self.mxid,
|
|
EventType.ROOM_MESSAGE,
|
|
message.msgtype,
|
|
)
|
|
|
|
async def _handle_matrix_media(
|
|
self, event_id: EventID, sender: u.User, message: MediaMessageEventContent
|
|
) -> None:
|
|
if message.file and decrypt_attachment:
|
|
data = await self.main_intent.download_media(message.file.url)
|
|
data = decrypt_attachment(
|
|
data, message.file.key.key, message.file.hashes.get("sha256"), message.file.iv
|
|
)
|
|
elif message.url:
|
|
data = await self.main_intent.download_media(message.url)
|
|
else:
|
|
raise NotImplementedError("No file or URL specified")
|
|
mimetype = message.info.mimetype or magic.mimetype(data)
|
|
filename = message.body
|
|
width, height = None, None
|
|
if message.msgtype in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO):
|
|
width = message.info.width
|
|
height = message.info.height
|
|
try:
|
|
ext = guess_extension(mimetype)
|
|
log_id = await sender.client.send_media(
|
|
self.channel_props,
|
|
TO_MSGTYPE_MAP[message.msgtype],
|
|
data,
|
|
filename,
|
|
width=width,
|
|
height=height,
|
|
ext=ext[1:] if ext else "",
|
|
)
|
|
except CommandException as e:
|
|
self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}")
|
|
raise
|
|
await self._make_dbm(event_id, log_id)
|
|
self.log.debug(f"Handled Matrix message {event_id} -> {log_id}")
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
event_id,
|
|
self.mxid,
|
|
EventType.ROOM_MESSAGE,
|
|
message.msgtype,
|
|
)
|
|
|
|
async def _handle_matrix_location(
|
|
self, sender: u.User, message: LocationMessageEventContent
|
|
) -> str:
|
|
pass
|
|
# TODO
|
|
# match = geo_uri_regex.fullmatch(message.geo_uri)
|
|
# return await self.thread_for(sender).send_pinned_location(float(match.group(1)),
|
|
# float(match.group(2)))
|
|
|
|
async def handle_matrix_redaction(
|
|
self, sender: u.User, event_id: EventID, redaction_event_id: EventID
|
|
) -> None:
|
|
try:
|
|
await self._handle_matrix_redaction(sender, event_id)
|
|
except Exception as e:
|
|
self.log.error(
|
|
f"Failed to handle Matrix redaction {redaction_event_id}: {e}",
|
|
exc_info=not isinstance(e, NotImplementedError),
|
|
)
|
|
sender.send_remote_checkpoint(
|
|
self._status_from_exception(e),
|
|
redaction_event_id,
|
|
self.mxid,
|
|
EventType.ROOM_REDACTION,
|
|
error=e,
|
|
)
|
|
if not isinstance(e, NotImplementedError):
|
|
await self._send_bridge_error(str(e), thing="redaction")
|
|
else:
|
|
await self._send_delivery_receipt(redaction_event_id)
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
redaction_event_id,
|
|
self.mxid,
|
|
EventType.ROOM_REDACTION,
|
|
)
|
|
|
|
async def _handle_matrix_redaction(self, sender: u.User, event_id: EventID) -> None:
|
|
sender, _ = await self.get_relay_sender(sender, f"redaction {event_id}")
|
|
if not sender:
|
|
raise Exception("not logged in")
|
|
elif not sender.is_connected:
|
|
raise Exception("not connected to KakaoTalk chats")
|
|
message = await DBMessage.get_by_mxid(event_id, self.mxid)
|
|
if message:
|
|
if not message.ktid:
|
|
raise NotImplementedError("Tried to redact message whose ktid is unknown")
|
|
try:
|
|
await message.delete()
|
|
await sender.client.delete_chat(self.channel_props, message.ktid)
|
|
except Exception as e:
|
|
self.log.exception(f"Unsend failed: {e}")
|
|
raise
|
|
return
|
|
|
|
raise NotImplementedError("Only message redactions are supported")
|
|
|
|
""" TODO
|
|
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
|
|
if reaction:
|
|
try:
|
|
await reaction.delete()
|
|
await sender.client.react(reaction.kt_msgid, None)
|
|
except Exception as e:
|
|
self.log.exception(f"Removing reaction failed: {e}")
|
|
raise
|
|
return
|
|
|
|
raise NotImplementedError("Only message and reaction redactions are supported")
|
|
"""
|
|
|
|
""" TODO
|
|
async def handle_matrix_reaction(
|
|
self, sender: u.User, event_id: EventID, reacting_to: EventID, reaction: str
|
|
) -> None:
|
|
pass
|
|
"""
|
|
|
|
async def handle_matrix_state_event(self, sender: u.User, evt: StateEvent) -> None:
|
|
try:
|
|
handler: StateEventHandler = self._STATE_EVENT_HANDLER_MAP[evt.type]
|
|
except KeyError:
|
|
# Misses should be guarded by supports_state_event, but handle this just in case
|
|
self.log.error(f"Skipping Matrix state event {evt.event_id} of unsupported type {evt.type}")
|
|
return
|
|
if not self.is_open:
|
|
self.log.info(f"Not bridging f{handler.action_name} change of portal for non-open channel")
|
|
return
|
|
try:
|
|
effective_sender, _ = await self.get_relay_sender(sender, f"{handler.action_name} {evt.event_id}")
|
|
if effective_sender:
|
|
await handler.apply(self, effective_sender, evt.prev_content, evt.content)
|
|
except Exception as e:
|
|
self.log.error(
|
|
f"Failed to handle Matrix {handler.action_name} {evt.event_id}: {e}",
|
|
exc_info=not isinstance(e, NotImplementedError),
|
|
)
|
|
sender.send_remote_checkpoint(
|
|
self._status_from_exception(e),
|
|
evt.event_id,
|
|
self.mxid,
|
|
evt.type,
|
|
error=e,
|
|
)
|
|
change = f"{handler.action_name} change"
|
|
await self._send_bridge_error(
|
|
f"{e}. Reverting the {change}...",
|
|
thing=change
|
|
)
|
|
# NOTE Redacting instead doesn't work
|
|
await handler.revert(self, evt.prev_content)
|
|
else:
|
|
await self._send_delivery_receipt(evt.event_id)
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
evt.event_id,
|
|
self.mxid,
|
|
evt.type,
|
|
)
|
|
|
|
async def _handle_matrix_power_levels(
|
|
self,
|
|
sender: u.User,
|
|
prev_content: PowerLevelStateEventContent,
|
|
content: PowerLevelStateEventContent,
|
|
) -> None:
|
|
ktid_perms: dict[Long, OpenChannelUserPerm] = {}
|
|
user_power_levels: dict[UserID, int] = {}
|
|
for user_id, level in content.users.items():
|
|
if level == prev_content.get_user_level(user_id):
|
|
continue
|
|
ktid = None
|
|
mxid = None
|
|
puppet = await p.Puppet.get_by_mxid(user_id)
|
|
user = await u.User.get_by_mxid(user_id) if not puppet else None
|
|
if puppet:
|
|
ktid = puppet.ktid
|
|
user = await u.User.get_by_ktid(ktid)
|
|
if user:
|
|
mxid = user.mxid
|
|
elif user:
|
|
ktid = user.ktid
|
|
puppet = await p.Puppet.get_by_ktid(ktid)
|
|
if puppet:
|
|
mxid = puppet.mxid
|
|
if ktid is not None:
|
|
ktid_perms[ktid] = TO_PERM_MAP.get(level)
|
|
if mxid is not None:
|
|
user_power_levels[mxid] = level
|
|
if ktid_perms:
|
|
if sender and sender.is_connected:
|
|
if user_power_levels:
|
|
await self._set_user_power_levels(await sender.get_puppet(), user_power_levels)
|
|
ok = True
|
|
results = await asyncio.gather(*[
|
|
sender.client.send_perm(self.channel_props, ktid, perm)
|
|
for ktid, perm in ktid_perms.items()
|
|
], return_exceptions=True)
|
|
for e in filter(lambda x: isinstance(x, Exception), results):
|
|
ok = False
|
|
self.log.exception(e)
|
|
if not ok:
|
|
self.log.info("Failed to send all perms, so re-syncing all participant info")
|
|
await self._update_participants(sender)
|
|
else:
|
|
raise Exception(
|
|
"Only users connected to KakaoTalk can set power levels of KakaoTalk users"
|
|
)
|
|
|
|
async def _revert_matrix_power_levels(self, prev_content: PowerLevelStateEventContent) -> None:
|
|
managed_power_levels: dict[UserID, int] = {}
|
|
for user_id, level in prev_content.users.items():
|
|
if await p.Puppet.get_by_mxid(user_id) or await u.User.get_by_mxid(user_id):
|
|
managed_power_levels[user_id] = level
|
|
await self._set_user_power_levels(None, managed_power_levels)
|
|
|
|
async def _handle_matrix_room_name(
|
|
self,
|
|
sender: u.User,
|
|
prev_content: RoomNameStateEventContent,
|
|
content: RoomNameStateEventContent,
|
|
) -> None:
|
|
if content.name == prev_content.name:
|
|
return
|
|
if not (sender and sender.is_connected):
|
|
raise Exception(
|
|
"Only users connected to KakaoTalk can set the name of a KakaoTalk channel"
|
|
)
|
|
await sender.client.set_channel_name(self.channel_props, content.name)
|
|
self.name = content.name
|
|
self.name_set = True
|
|
await self.save()
|
|
|
|
async def _revert_matrix_room_name(self, prev_content: RoomNameStateEventContent) -> None:
|
|
await self.main_intent.set_room_name(self.mxid, prev_content.name)
|
|
|
|
async def _handle_matrix_room_topic(
|
|
self,
|
|
sender: u.User,
|
|
prev_content: RoomTopicStateEventContent,
|
|
content: RoomTopicStateEventContent,
|
|
) -> None:
|
|
if content.topic == prev_content.topic:
|
|
return
|
|
if not (sender and sender.is_connected):
|
|
raise Exception(
|
|
"Only users connected to KakaoTalk can set the description of a KakaoTalk channel"
|
|
)
|
|
await sender.client.set_channel_description(self.channel_props, content.topic)
|
|
self.description = content.topic
|
|
self.topic_set = True
|
|
await self.save()
|
|
|
|
async def _revert_matrix_room_topic(self, prev_content: RoomTopicStateEventContent) -> None:
|
|
await self.main_intent.set_room_topic(self.mxid, prev_content.topic or "")
|
|
|
|
async def _handle_matrix_room_avatar(
|
|
self,
|
|
sender: u.User,
|
|
prev_content: RoomAvatarStateEventContent,
|
|
content: RoomAvatarStateEventContent,
|
|
) -> None:
|
|
if content.url == prev_content.url:
|
|
return
|
|
if not (sender and sender.is_connected):
|
|
raise Exception(
|
|
"Only users connected to KakaoTalk can set the photo of a KakaoTalk channel"
|
|
)
|
|
raise NotImplementedError("Changing the room avatar is not supported by the KakaoTalk bridge.")
|
|
""" TODO
|
|
photo_url = str(self.main_intent.api.get_download_url(content.url))
|
|
await sender.client.set_channel_photo(self.channel_props, photo_url)
|
|
self.photo_id = photo_url
|
|
self.avatar_url = content.url
|
|
self.avatar_set = True
|
|
await self.save()
|
|
"""
|
|
|
|
async def _revert_matrix_room_avatar(self, prev_content: RoomAvatarStateEventContent) -> None:
|
|
await self.main_intent.set_room_avatar(self.mxid, prev_content.url)
|
|
|
|
async def handle_matrix_leave(self, user: u.User) -> None:
|
|
if self.is_direct:
|
|
self.log.info(f"{user.mxid} left private chat portal with {self.ktid}")
|
|
if user.ktid == self.kt_receiver:
|
|
self.log.info(
|
|
f"{user.mxid} was the recipient of this portal. Cleaning up and deleting..."
|
|
)
|
|
await self.cleanup_and_delete()
|
|
else:
|
|
self.log.debug(f"{user.mxid} left portal to {self.ktid}")
|
|
|
|
# endregion
|
|
# region KakaoTalk event handling
|
|
|
|
async def _bridge_own_message_pm(
|
|
self, source: u.User, sender: p.Puppet, mid: str, invite: bool = True
|
|
) -> bool:
|
|
if self.is_direct and sender.ktid == source.ktid and not sender.is_real_user:
|
|
if self.invite_own_puppet_to_pm and invite:
|
|
await self.main_intent.invite_user(self.mxid, sender.mxid)
|
|
elif (
|
|
await self.az.state_store.get_membership(self.mxid, sender.mxid) != Membership.JOIN
|
|
):
|
|
self.log.warning(
|
|
f"Ignoring own {mid} in private chat because own puppet is not in room."
|
|
)
|
|
return False
|
|
return True
|
|
|
|
async def _add_kakaotalk_reply(
|
|
self, content: MessageEventContent, reply_to: ReplyAttachment
|
|
) -> None:
|
|
message = await DBMessage.get_by_ktid(reply_to.src_logId, *self.ktid_full)
|
|
if not message:
|
|
self.log.warning(
|
|
f"Couldn't find reply target {reply_to.src_logId} to bridge reply metadata to Matrix"
|
|
)
|
|
return
|
|
|
|
content.set_reply(message.mxid)
|
|
if not isinstance(content, TextMessageEventContent):
|
|
return
|
|
|
|
try:
|
|
evt = await self.main_intent.get_event(message.mx_room, message.mxid)
|
|
except (MNotFound, MForbidden):
|
|
evt = None
|
|
if not evt:
|
|
return
|
|
|
|
if evt.type == EventType.ROOM_ENCRYPTED:
|
|
try:
|
|
evt = await self.matrix.e2ee.decrypt(evt, wait_session_timeout=0)
|
|
except SessionNotFound:
|
|
return
|
|
|
|
if isinstance(evt.content, TextMessageEventContent):
|
|
evt.content.trim_reply_fallback()
|
|
|
|
content.set_reply(evt)
|
|
|
|
async def handle_kakaotalk_chat(
|
|
self,
|
|
source: u.User,
|
|
sender: p.Puppet,
|
|
chat: Chatlog,
|
|
) -> None:
|
|
try:
|
|
await self._handle_kakaotalk_chat(source, sender, chat)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Error handling KakaoTalk chat %s",
|
|
chat.logId,
|
|
)
|
|
|
|
async def _handle_kakaotalk_chat(
|
|
self,
|
|
source: u.User,
|
|
sender: p.Puppet,
|
|
chat: Chatlog,
|
|
) -> None:
|
|
# TODO Backfill!! This avoids timing conflicts on startup sync
|
|
self.log.debug(f"Handling KakaoTalk event {chat.logId}")
|
|
if not self.mxid:
|
|
mxid = await self.create_matrix_room(source)
|
|
if not mxid:
|
|
# Failed to create
|
|
return
|
|
if not await self._bridge_own_message_pm(source, sender, f"chat {chat.logId}"):
|
|
return
|
|
intent = sender.intent_for(self)
|
|
if (
|
|
self._backfill_leave is not None
|
|
and self.ktid != sender.ktid
|
|
and intent != sender.intent
|
|
and intent not in self._backfill_leave
|
|
):
|
|
self.log.debug("Adding %s's default puppet to room for backfilling", sender.mxid)
|
|
await self.main_intent.invite_user(self.mxid, intent.mxid)
|
|
await intent.ensure_joined(self.mxid)
|
|
self._backfill_leave.add(intent)
|
|
|
|
handler = self._CHAT_TYPE_HANDLER_MAP.get(chat.type, Portal._handle_kakaotalk_unsupported)
|
|
event_ids = [
|
|
event_id for event_id in
|
|
await handler(
|
|
self,
|
|
source=source,
|
|
intent=intent,
|
|
attachment=chat.attachment,
|
|
timestamp=chat.sendAt,
|
|
chat_text=chat.text,
|
|
chat_type=chat.type,
|
|
)
|
|
if event_id
|
|
]
|
|
if not event_ids:
|
|
self.log.warning(f"Unhandled KakaoTalk chat {chat.logId}")
|
|
return
|
|
self.log.debug(f"Handled KakaoTalk chat {chat.logId} -> {event_ids}")
|
|
# TODO Might have to handle remote reactions on messages created by bulk_create
|
|
await DBMessage.bulk_create(
|
|
ktid=chat.logId,
|
|
kt_channel=self.ktid,
|
|
kt_receiver=self.kt_receiver,
|
|
mx_room=self.mxid,
|
|
timestamp=chat.sendAt,
|
|
event_ids=event_ids,
|
|
)
|
|
await self._send_delivery_receipt(event_ids[-1])
|
|
|
|
async def _handle_kakaotalk_unsupported(
|
|
self,
|
|
intent: IntentAPI,
|
|
timestamp: int,
|
|
chat_text: str | None,
|
|
chat_type: ChatType,
|
|
**_
|
|
) -> list[EventID]:
|
|
try:
|
|
type_str = KnownChatType(chat_type).name.lower()
|
|
except ValueError:
|
|
type_str = str(chat_type)
|
|
self.log.warning("No handler for chat type \"%s\" (%s)",
|
|
type_str,
|
|
f"text = \"{chat_text}\"" if chat_text is not None else "no text",
|
|
)
|
|
if chat_text:
|
|
events = await self._handle_kakaotalk_text(
|
|
intent=intent,
|
|
attachment=None,
|
|
timestamp=timestamp,
|
|
chat_text=chat_text,
|
|
)
|
|
else:
|
|
events = []
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
body=f"\u26a0 Unbridgeable message ({type_str})",
|
|
)
|
|
if events:
|
|
content.set_reply(events[-1])
|
|
events.append(await self._send_message(intent, content, timestamp=timestamp))
|
|
return events
|
|
|
|
async def _handle_kakaotalk_feed(
|
|
self,
|
|
timestamp: int,
|
|
chat_text: str | None,
|
|
**_
|
|
) -> list[EventID]:
|
|
self.log.info("Got feed message at %s: %s", timestamp, chat_text or "none")
|
|
return []
|
|
|
|
async def _handle_kakaotalk_deleted(
|
|
self,
|
|
timestamp: int,
|
|
**_
|
|
) -> list[EventID]:
|
|
self.log.info(f"Got deleted (?) message at {timestamp}")
|
|
return []
|
|
|
|
async def _handle_kakaotalk_text(
|
|
self,
|
|
intent: IntentAPI,
|
|
attachment: Attachment | None,
|
|
timestamp: int,
|
|
chat_text: str | None,
|
|
**_
|
|
) -> list[EventID]:
|
|
content = await kakaotalk_to_matrix(chat_text, attachment.mentions if attachment else None)
|
|
return [await self._send_message(intent, content, timestamp=timestamp)]
|
|
|
|
async def _handle_kakaotalk_reply(
|
|
self,
|
|
intent: IntentAPI,
|
|
attachment: ReplyAttachment,
|
|
timestamp: int,
|
|
chat_text: str,
|
|
**_
|
|
) -> list[EventID]:
|
|
content = await kakaotalk_to_matrix(chat_text, attachment.mentions)
|
|
await self._add_kakaotalk_reply(content, attachment)
|
|
return [await self._send_message(intent, content, timestamp=timestamp)]
|
|
|
|
async def _handle_kakaotalk_photo(self, **kwargs) -> list[EventID]:
|
|
return [await self._handle_kakaotalk_uniphoto(**kwargs)]
|
|
|
|
async def _handle_kakaotalk_multiphoto(
|
|
self,
|
|
attachment: MultiPhotoAttachment,
|
|
**kwargs
|
|
) -> list[EventID]:
|
|
# TODO Upload media concurrently, but post messages sequentially
|
|
return [
|
|
await self._handle_kakaotalk_uniphoto(
|
|
attachment=PhotoAttachment(
|
|
shout=attachment.shout,
|
|
mentions=attachment.mentions,
|
|
urls=attachment.urls,
|
|
url=attachment.imageUrls[i],
|
|
s=attachment.sl[i],
|
|
k=attachment.kl[i],
|
|
w=attachment.wl[i],
|
|
h=attachment.hl[i],
|
|
thumbnailUrl=attachment.thumbnailUrls[i],
|
|
thumbnailWidth=attachment.thumbnailWidths[i],
|
|
thumbnailHeight=attachment.thumbnailHeights[i],
|
|
cs=attachment.csl[i],
|
|
mt=attachment.mtl[i],
|
|
),
|
|
**kwargs
|
|
)
|
|
for i in range(len(attachment.imageUrls))
|
|
]
|
|
|
|
def _handle_kakaotalk_uniphoto(
|
|
self,
|
|
attachment: PhotoAttachment,
|
|
**kwargs
|
|
) -> Awaitable[EventID]:
|
|
return self._handle_kakaotalk_media(
|
|
attachment,
|
|
ImageInfo(
|
|
mimetype=attachment.mt,
|
|
size=attachment.s,
|
|
width=attachment.w,
|
|
height=attachment.h,
|
|
),
|
|
MessageType.IMAGE,
|
|
**kwargs
|
|
)
|
|
|
|
async def _handle_kakaotalk_video(
|
|
self,
|
|
attachment: VideoAttachment,
|
|
**kwargs
|
|
) -> list[EventID]:
|
|
return [await self._handle_kakaotalk_media(
|
|
attachment,
|
|
VideoInfo(
|
|
duration=attachment.d,
|
|
width=attachment.w,
|
|
height=attachment.h,
|
|
),
|
|
MessageType.VIDEO,
|
|
**kwargs
|
|
)]
|
|
|
|
async def _handle_kakaotalk_audio(
|
|
self,
|
|
attachment: AudioAttachment,
|
|
**kwargs
|
|
) -> list[EventID]:
|
|
return [await self._handle_kakaotalk_media(
|
|
attachment,
|
|
AudioInfo(
|
|
size=attachment.s,
|
|
duration=attachment.d,
|
|
),
|
|
MessageType.AUDIO,
|
|
**kwargs
|
|
)]
|
|
|
|
async def _handle_kakaotalk_media(
|
|
self,
|
|
attachment: MediaAttachment | AudioAttachment,
|
|
info: MediaInfo,
|
|
msgtype: MessageType,
|
|
*,
|
|
source: u.User,
|
|
intent: IntentAPI,
|
|
timestamp: int,
|
|
chat_text: str | None,
|
|
**_
|
|
) -> EventID:
|
|
mxc, additional_info, decryption_info = await self._reupload_kakaotalk_file_from_url(
|
|
attachment.url,
|
|
source,
|
|
intent,
|
|
mimetype=info.mimetype,
|
|
encrypt=self.encrypted,
|
|
find_size=False,
|
|
)
|
|
info.size = additional_info.size
|
|
info.mimetype = additional_info.mimetype
|
|
content = MediaMessageEventContent(
|
|
url=mxc, file=decryption_info, msgtype=msgtype, body=chat_text or "", info=info
|
|
)
|
|
return await self._send_message(intent, content, timestamp=timestamp)
|
|
|
|
async def _handle_kakaotalk_file(
|
|
self,
|
|
source: u.User,
|
|
intent: IntentAPI,
|
|
attachment: FileAttachment,
|
|
timestamp: int,
|
|
chat_text: str | None,
|
|
**_
|
|
) -> list[EventID]:
|
|
data = await source.client.download_file(self.channel_props, attachment.k)
|
|
mxc, info, decryption_info = await self._reupload_kakaotalk_file_from_bytes(
|
|
data,
|
|
intent,
|
|
filename=attachment.name,
|
|
encrypt=self.encrypted,
|
|
)
|
|
content = MediaMessageEventContent(
|
|
url=mxc, file=decryption_info, msgtype=MessageType.FILE, body=chat_text or "", info=info
|
|
)
|
|
return [await self._send_message(intent, content, timestamp=timestamp)]
|
|
|
|
async def handle_kakaotalk_chat_delete(
|
|
self,
|
|
sender: p.Puppet,
|
|
chat_id: Long,
|
|
timestamp: int,
|
|
) -> None:
|
|
if not self.mxid:
|
|
return
|
|
for message in await DBMessage.get_all_by_ktid(chat_id, *self.ktid_full):
|
|
try:
|
|
await sender.intent_for(self).redact(
|
|
message.mx_room, message.mxid, timestamp=timestamp
|
|
)
|
|
except MForbidden:
|
|
await self.main_intent.redact(message.mx_room, message.mxid, timestamp=timestamp)
|
|
await message.delete()
|
|
|
|
async def handle_kakaotalk_chat_read(self, source: u.User, sender: p.Puppet, chat_id: Long) -> None:
|
|
if not self.mxid:
|
|
return
|
|
msg = await DBMessage.get_closest_before(self.ktid, self.kt_receiver, chat_id)
|
|
if not msg:
|
|
return
|
|
if not await self._bridge_own_message_pm(source, sender, "read receipt", invite=False):
|
|
return
|
|
# NOTE No need for timestamp when the read receipt happened, since this is only for live ones
|
|
await sender.intent_for(self).mark_read(msg.mx_room, msg.mxid)
|
|
self.log.debug(
|
|
f"Handled KakaoTalk read receipt from {sender.ktid} up to {chat_id}/{msg.mxid}"
|
|
)
|
|
|
|
async def handle_kakaotalk_perm_changed(
|
|
self, source: u.User, sender: p.Puppet, user_id: Long, perm: OpenChannelUserPerm
|
|
) -> None:
|
|
user_power_levels: dict[UserID, int] = {}
|
|
await self._update_mapped_ktid_power_levels(user_power_levels, user_id, perm)
|
|
await self._set_user_power_levels(sender, user_power_levels)
|
|
|
|
async def handle_kakaotalk_user_join(
|
|
self, source: u.User, user: p.Puppet
|
|
) -> None:
|
|
# TODO Check if a KT user can join as an admin / room owner
|
|
await self.main_intent.invite_user(self.mxid, user.mxid)
|
|
await user.intent_for(self).join_room_by_id(self.mxid)
|
|
if not user.name:
|
|
self.schedule_resync(source, user)
|
|
|
|
async def handle_kakaotalk_user_left(
|
|
self, source: u.User, sender: p.Puppet | None, removed: p.Puppet
|
|
) -> None:
|
|
sender_intent = sender.intent_for(self) if sender else self.main_intent
|
|
if sender == removed:
|
|
removed_intent = removed.intent_for(self)
|
|
if removed_intent != self.main_intent:
|
|
await removed_intent.leave_room(self.mxid)
|
|
if not removed.is_real_user:
|
|
user = await u.User.get_by_ktid(removed.ktid)
|
|
if user:
|
|
await self.main_intent.kick_user(self.mxid, user.mxid, "Left channel from KakaoTalk")
|
|
else:
|
|
for removed_mxid in (r.mxid for r in (
|
|
removed,
|
|
await u.User.get_by_ktid(removed.ktid) if not removed.is_real_user else None
|
|
) if r):
|
|
try:
|
|
await sender_intent.ban_user(
|
|
self.mxid, removed_mxid, None if sender else "Kicked by channel admin"
|
|
)
|
|
except MForbidden:
|
|
if not sender:
|
|
raise
|
|
await self.main_intent.ban_user(
|
|
self.mxid, removed_mxid, reason=f"Kicked by {sender.name}"
|
|
)
|
|
# TODO Clean and delete if removed is real user and portal is direct / not open
|
|
|
|
# TODO Find when or if there is a listener for this
|
|
# TODO Confirm whether this can refer to any user that was kicked, or only to the current user
|
|
async def handle_kakaotalk_user_unkick(
|
|
self, source: u.User, sender: p.Puppet | None, unkicked: p.Puppet
|
|
) -> None:
|
|
assert sender != unkicked, f"Puppet for {unkicked.mxid} tried to unkick itself"
|
|
sender_intent = sender.intent_for(self) if sender else self.main_intent
|
|
for unkicked_mxid in (r.mxid for r in (
|
|
unkicked,
|
|
await u.User.get_by_ktid(unkicked.ktid) if not unkicked.is_real_user else None
|
|
) if r):
|
|
# NOTE KakaoTalk kick = Matrix ban
|
|
try:
|
|
await sender_intent.unban_user(self.mxid, unkicked_mxid)
|
|
except MForbidden:
|
|
if not sender:
|
|
raise
|
|
await self.main_intent.unban_user(self.mxid, unkicked_mxid)
|
|
|
|
|
|
# endregion
|
|
|
|
async def backfill(self, source: u.User, is_initial: bool, channel_info: ChannelInfo) -> None:
|
|
limit = (
|
|
self.config["bridge.backfill.initial_limit"]
|
|
if is_initial
|
|
else self.config["bridge.backfill.missed_limit"]
|
|
)
|
|
if limit == 0:
|
|
return
|
|
elif limit < 0:
|
|
limit = None
|
|
last_log_id = None
|
|
if not is_initial and channel_info.lastChatLog:
|
|
last_log_id = channel_info.lastChatLog.logId
|
|
most_recent = await DBMessage.get_most_recent(self.ktid, self.kt_receiver)
|
|
if most_recent and is_initial:
|
|
self.log.debug("Not backfilling %s: already bridged messages found", self.ktid_log)
|
|
# TODO Should this be removed? With it, can't sync an empty portal!
|
|
#elif (not most_recent or not most_recent.timestamp) and not is_initial:
|
|
# self.log.debug("Not backfilling %s: no most recent message found", self.ktid_log)
|
|
elif last_log_id and most_recent and int(most_recent.ktid or 0) >= int(last_log_id):
|
|
self.log.debug(
|
|
"Not backfilling %s: last activity is equal to most recent bridged "
|
|
"message (%s >= %s)",
|
|
self.ktid_log,
|
|
most_recent.ktid,
|
|
last_log_id,
|
|
)
|
|
else:
|
|
with self.backfill_lock:
|
|
await self._backfill(
|
|
source,
|
|
limit,
|
|
most_recent.ktid if most_recent else None,
|
|
)
|
|
|
|
async def _backfill(
|
|
self,
|
|
source: u.User,
|
|
limit: int | None,
|
|
after_log_id: Long | None,
|
|
) -> None:
|
|
self.log.debug(f"Backfilling history through {source.mxid}")
|
|
self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}")
|
|
chats = await source.client.get_chats(
|
|
self.channel_props,
|
|
after_log_id,
|
|
limit,
|
|
)
|
|
if not chats:
|
|
self.log.debug("Didn't get any messages from server")
|
|
return
|
|
self.log.debug(f"Got {len(chats)} message{'s' if len(chats) > 1 else ''} from server")
|
|
self._backfill_leave = set()
|
|
async with NotificationDisabler(self.mxid, source):
|
|
for chat in chats:
|
|
puppet = await p.Puppet.get_by_ktid(chat.sender.userId)
|
|
await self.handle_kakaotalk_chat(source, puppet, chat)
|
|
for intent in self._backfill_leave:
|
|
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
|
|
await intent.leave_room(self.mxid)
|
|
self.log.info("Backfilled %d messages through %s", len(chats), source.mxid)
|
|
self._sync_read_receipts(source)
|
|
|
|
# region Database getters
|
|
|
|
async def postinit(self) -> None:
|
|
self.by_ktid[self.ktid_full] = self
|
|
if self.mxid:
|
|
self.by_mxid[self.mxid] = self
|
|
if not self.is_direct:
|
|
self._main_intent = self.az.intent
|
|
else:
|
|
# TODO Save kt_sender in DB instead? Depends on if DM channels are shared...
|
|
user = await u.User.get_by_ktid(self.kt_receiver)
|
|
assert user, f"Found no user for this portal's receiver of {self.kt_receiver}"
|
|
if self.kt_type == KnownChannelType.MemoChat:
|
|
self._kt_sender = user.ktid
|
|
else:
|
|
# NOTE This throws if the user isn't connected--good!
|
|
# Nothing should init a portal for a disconnected user.
|
|
participants = await user.client.get_participants(self.channel_props)
|
|
self._kt_sender = participants[
|
|
0 if participants[0].userId != user.ktid else 1
|
|
].userId
|
|
self._main_intent = (await p.Puppet.get_by_ktid(self._kt_sender)).default_mxid_intent
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_mxid(cls, mxid: RoomID) -> Portal | None:
|
|
try:
|
|
return cls.by_mxid[mxid]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = cast(cls, await super().get_by_mxid(mxid))
|
|
if portal:
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_ktid(
|
|
cls,
|
|
ktid: int,
|
|
*,
|
|
kt_receiver: int = 0,
|
|
create: bool = True,
|
|
kt_type: ChannelType | None = None,
|
|
) -> Portal | None:
|
|
# TODO Find out if direct channels are shared. If so, don't need kt_receiver!
|
|
if kt_type:
|
|
kt_receiver = kt_receiver if KnownChannelType.is_direct(kt_type) else 0
|
|
ktid_full = (ktid, kt_receiver)
|
|
try:
|
|
return cls.by_ktid[ktid_full]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = cast(cls, await super().get_by_ktid(ktid, kt_receiver))
|
|
if portal:
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
if kt_type and create:
|
|
portal = cls(ktid=ktid, kt_receiver=kt_receiver, kt_type=kt_type)
|
|
await portal.insert()
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
async def get_all_by_receiver(cls, kt_receiver: int) -> AsyncGenerator[Portal, None]:
|
|
portals = await super().get_all_by_receiver(kt_receiver)
|
|
portal: Portal
|
|
for portal in portals:
|
|
try:
|
|
yield cls.by_ktid[(portal.ktid, portal.kt_receiver)]
|
|
except KeyError:
|
|
await portal.postinit()
|
|
yield portal
|
|
|
|
@classmethod
|
|
async def all(cls) -> AsyncGenerator[Portal, None]:
|
|
portals = await super().all()
|
|
portal: Portal
|
|
for portal in portals:
|
|
try:
|
|
yield cls.by_ktid[(portal.ktid, portal.kt_receiver)]
|
|
except KeyError:
|
|
await portal.postinit()
|
|
yield portal
|
|
|
|
# endregion
|