Compare commits

...

8 Commits

Author SHA1 Message Date
861e3ff30d Update roadmap 2022-04-09 04:14:02 -04:00
297697973f Catch a possible non-cancel exception when terminating Node module 2022-04-09 04:14:02 -04:00
61d9a60704 Outgoing message redactions 2022-04-09 04:14:02 -04:00
01a89508f6 Remove outbound typing listeners
Since KakaoTalk doesn't support typing notifications
2022-04-09 04:14:02 -04:00
60b115bd38 Rename remote->kakaotalk and message->chat 2022-04-09 04:04:10 -04:00
f7d889486a Inbound message deletion/hiding
Treating hiding as equivalent to deletion
2022-04-09 04:04:10 -04:00
075bf3e60f Better tracking of client event handlers 2022-04-09 04:04:10 -04:00
aee66976f6 Style improvements in client.js 2022-04-08 19:01:32 -04:00
8 changed files with 275 additions and 160 deletions

View File

@ -14,10 +14,8 @@
* [x] In DMs * [x] In DMs
* [ ] In multi-user rooms * [ ] In multi-user rooms
* [x] Mentions * [x] Mentions
* [ ] Message redactions * [x] Message redactions<sup>[1]</sup>
* [ ] Message reactions * [ ] Message reactions
* [ ] Presence
* [ ] Typing notifications
* [ ] Read receipts * [ ] Read receipts
* [ ] Power level * [ ] Power level
* [ ] Membership actions * [ ] Membership actions
@ -40,10 +38,9 @@
* [ ] Locations * [ ] Locations
* [x] Replies * [x] Replies
* [x] Mentions * [x] Mentions
* [x] Message deletion/hiding
* [ ] Message reactions * [ ] Message reactions
* [x] Message history * [x] Message history
* [ ] Presence
* [ ] Typing notifications
* [ ] Read receipts * [ ] Read receipts
* [ ] Admin status * [ ] Admin status
* [ ] Membership actions * [ ] Membership actions
@ -71,3 +68,5 @@
* [ ] For existing long-idled KakaoTalk channels * [ ] For existing long-idled KakaoTalk channels
* [ ] For new KakaoTalk channels * [ ] For new KakaoTalk channels
* [x] Option to use own Matrix account for messages sent from other KakaoTalk clients * [x] Option to use own Matrix account for messages sent from other KakaoTalk clients
<sup>[1]</sup> Only recently-sent messages can be deleted

View File

@ -22,7 +22,7 @@ with any other potential backend.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, cast, ClassVar, Type, Optional, Union from typing import TYPE_CHECKING, cast, Type, Optional, Union
import asyncio import asyncio
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import logging import logging
@ -35,7 +35,7 @@ from yarl import URL
from mautrix.util.logging import TraceLogger from mautrix.util.logging import TraceLogger
from ...config import Config from ...config import Config
from ...rpc import RPCClient from ...rpc import EventHandler, RPCClient
from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct from ..types.api.struct.profile import ProfileReqStruct, ProfileStruct
from ..types.api.struct import FriendListStruct from ..types.api.struct import FriendListStruct
@ -136,11 +136,13 @@ class Client:
_rpc_disconnection_task: asyncio.Task | None _rpc_disconnection_task: asyncio.Task | None
http: ClientSession http: ClientSession
log: TraceLogger log: TraceLogger
_handler_methods: list[str]
def __init__(self, user: u.User, log: Optional[TraceLogger] = None): def __init__(self, user: u.User, log: Optional[TraceLogger] = None):
"""Create a per-user client object for user-specific client functionality.""" """Create a per-user client object for user-specific client functionality."""
self.user = user self.user = user
self._rpc_disconnection_task = None self._rpc_disconnection_task = None
self._handler_methods = []
# TODO Let the Node backend use a proxy too! # TODO Let the Node backend use a proxy too!
connector = None connector = None
@ -299,7 +301,7 @@ class Client:
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid) await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
) )
async def send_message( async def send_chat(
self, self,
channel_props: ChannelProps, channel_props: ChannelProps,
text: str, text: str,
@ -341,6 +343,17 @@ class Client:
is_secret=True is_secret=True
) )
async def delete_chat(
self,
channel_props: ChannelProps,
chat_id: Long,
) -> None:
return await self._api_user_request_void(
"delete_chat",
channel_props=channel_props.serialize(),
chat_id=chat_id.serialize(),
)
# TODO Combine these into one # TODO Combine these into one
@ -373,11 +386,20 @@ class Client:
# region listeners # region listeners
async def _on_message(self, data: dict[str, JSON]) -> None: async def _on_chat(self, data: dict[str, JSON]) -> None:
await self.user.on_message( await self.user.on_chat(
Chatlog.deserialize(data["chatlog"]), Chatlog.deserialize(data["chatlog"]),
Long.deserialize(data["channelId"]), Long.deserialize(data["channelId"]),
data["channelType"], str(data["channelType"]),
)
async def _on_chat_deleted(self, data: dict[str, JSON]) -> None:
await self.user.on_chat_deleted(
Long.deserialize(data["chatId"]),
Long.deserialize(data["senderId"]),
int(data["timestamp"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
) )
""" TODO """ TODO
@ -403,18 +425,20 @@ class Client:
def _start_listen(self) -> None: def _start_listen(self) -> None:
# TODO Automate this somehow, like with a fancy enum self._add_event_handler("chat", self._on_chat)
self._rpc_client.set_event_handlers(self._get_user_cmd("chat"), [self._on_message]) self._add_event_handler("chat_deleted", self._on_chat_deleted)
# TODO many more listeners # TODO many more listeners
self._rpc_client.set_event_handlers(self._get_user_cmd("disconnected"), [self._on_listen_disconnect]) self._add_event_handler("disconnected", self._on_listen_disconnect)
self._rpc_client.set_event_handlers(self._get_user_cmd("switch_server"), [self._on_switch_server]) self._add_event_handler("switch_server", self._on_switch_server)
def _stop_listen(self) -> None: def _stop_listen(self) -> None:
# TODO Automate this somehow, like with a fancy enum for method in self._handler_methods:
self._rpc_client.set_event_handlers(self._get_user_cmd("chat"), []) self._rpc_client.set_event_handlers(self._get_user_cmd(method), [])
# TODO many more listeners
self._rpc_client.set_event_handlers(self._get_user_cmd("disconnected"), [])
self._rpc_client.set_event_handlers(self._get_user_cmd("switch_server"), []) def _add_event_handler(self, method: str, handler: EventHandler):
self._rpc_client.set_event_handlers(self._get_user_cmd(method), [handler])
self._handler_methods.append(method)
def _get_user_cmd(self, command) -> str: def _get_user_cmd(self, command) -> str:
return f"{command}:{self.user.mxid}" return f"{command}:{self.user.mxid}"

View File

@ -15,29 +15,23 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Union from typing import TYPE_CHECKING
from mautrix.bridge import BaseMatrixHandler from mautrix.bridge import BaseMatrixHandler
from mautrix.errors import MatrixError
from mautrix.types import ( from mautrix.types import (
Event, Event,
EventID, EventID,
EventType, EventType,
MessageType,
PresenceEvent,
ReactionEvent, ReactionEvent,
ReactionEventContent, ReactionEventContent,
ReceiptEvent,
RedactionEvent, RedactionEvent,
RelationType, RelationType,
RoomID, RoomID,
SingleReceiptEventContent, SingleReceiptEventContent,
TextMessageEventContent,
TypingEvent,
UserID, UserID,
) )
from . import portal as po, puppet as pu, user as u from . import portal as po, user as u
if TYPE_CHECKING: if TYPE_CHECKING:
from .__main__ import KakaoTalkBridge from .__main__ import KakaoTalkBridge
@ -147,14 +141,6 @@ class MatrixHandler(BaseMatrixHandler):
user, event_id, content.relates_to.event_id, content.relates_to.key user, event_id, content.relates_to.event_id, content.relates_to.key
) )
@staticmethod
async def handle_typing(room_id: RoomID, typing: list[UserID]) -> None:
portal = await po.Portal.get_by_mxid(room_id)
if not portal or not portal.is_direct:
return
await portal.handle_matrix_typing(set(typing))
async def handle_read_receipt( async def handle_read_receipt(
self, self,
user: u.User, user: u.User,
@ -176,14 +162,6 @@ class MatrixHandler(BaseMatrixHandler):
) )
""" """
async def handle_ephemeral_event(
self, evt: Union[ReceiptEvent, PresenceEvent, TypingEvent]
) -> None:
if evt.type == EventType.TYPING:
await self.handle_typing(evt.room_id, evt.content.user_ids)
elif evt.type == EventType.RECEIPT:
await self.handle_receipt(evt)
async def handle_event(self, evt: Event) -> None: async def handle_event(self, evt: Event) -> None:
if evt.type == EventType.ROOM_REDACTION: if evt.type == EventType.ROOM_REDACTION:
evt: RedactionEvent evt: RedactionEvent

View File

@ -127,7 +127,6 @@ class Portal(DBPortal, BasePortal):
_create_room_lock: asyncio.Lock _create_room_lock: asyncio.Lock
_send_locks: dict[int, asyncio.Lock] _send_locks: dict[int, asyncio.Lock]
_noop_lock: FakeLock = FakeLock() _noop_lock: FakeLock = FakeLock()
_typing: set[UserID]
backfill_lock: SimpleLock backfill_lock: SimpleLock
_backfill_leave: set[IntentAPI] | None _backfill_leave: set[IntentAPI] | None
_sleeping_to_resync: bool _sleeping_to_resync: bool
@ -167,7 +166,6 @@ class Portal(DBPortal, BasePortal):
self._kt_sender = None self._kt_sender = None
self._create_room_lock = asyncio.Lock() self._create_room_lock = asyncio.Lock()
self._send_locks = {} self._send_locks = {}
self._typing = set()
self._sleeping_to_resync = False self._sleeping_to_resync = False
self._scheduled_resync = None self._scheduled_resync = None
self._resync_targets = {} self._resync_targets = {}
@ -191,14 +189,14 @@ class Portal(DBPortal, BasePortal):
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"] NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
# TODO More # TODO More
cls._message_type_handler_map = { cls._chat_type_handler_map = {
KnownChatType.TEXT: cls._handle_remote_text, KnownChatType.TEXT: cls._handle_kakaotalk_text,
KnownChatType.REPLY: cls._handle_remote_reply, KnownChatType.REPLY: cls._handle_kakaotalk_reply,
KnownChatType.PHOTO: cls._handle_remote_photo, KnownChatType.PHOTO: cls._handle_kakaotalk_photo,
KnownChatType.MULTIPHOTO: cls._handle_remote_multiphoto, KnownChatType.MULTIPHOTO: cls._handle_kakaotalk_multiphoto,
KnownChatType.VIDEO: cls._handle_remote_video, KnownChatType.VIDEO: cls._handle_kakaotalk_video,
KnownChatType.AUDIO: cls._handle_remote_audio, KnownChatType.AUDIO: cls._handle_kakaotalk_audio,
#KnownChatType.FILE: cls._handle_remote_file, #KnownChatType.FILE: cls._handle_kakaotalk_file,
} }
# region DB conversion # region DB conversion
@ -318,7 +316,7 @@ class Portal(DBPortal, BasePortal):
return info return info
@classmethod @classmethod
async def _reupload_remote_file( async def _reupload_kakaotalk_file(
cls, cls,
url: str, url: str,
source: u.User, source: u.User,
@ -808,7 +806,7 @@ class Portal(DBPortal, BasePortal):
) -> None: ) -> None:
converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self.main_intent) converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self.main_intent)
try: try:
chatlog = await sender.client.send_message( chatlog = await sender.client.send_chat(
self.channel_props, self.channel_props,
text=converted.text, text=converted.text,
reply_to=converted.reply_to, reply_to=converted.reply_to,
@ -881,24 +879,61 @@ class Portal(DBPortal, BasePortal):
self, sender: u.User, event_id: EventID, redaction_event_id: EventID self, sender: u.User, event_id: EventID, redaction_event_id: EventID
) -> None: ) -> None:
try: try:
await self._handle_matrix_redaction(sender, event_id, redaction_event_id) await self._handle_matrix_redaction(sender, event_id)
except Exception as e: except Exception as e:
self.log.exception(f"Failed to handle Matrix event {event_id}: {e}") self.log.error(
f"Failed to handle Matrix redaction {redaction_event_id}: {e}",
exc_info=not isinstance(e, NotImplementedError),
)
sender.send_remote_checkpoint( sender.send_remote_checkpoint(
self._status_from_exception(e), self._status_from_exception(e),
event_id, redaction_event_id,
self.mxid, self.mxid,
EventType.ROOM_REDACTION, EventType.ROOM_REDACTION,
error=e, error=e,
) )
await self._send_bridge_error(str(e)) if not isinstance(e, NotImplementedError):
await self._send_bridge_error(str(e), thing="redaction")
else: else:
await self._send_delivery_receipt(event_id) await self._send_delivery_receipt(redaction_event_id)
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
redaction_event_id,
self.mxid,
EventType.ROOM_REDACTION,
)
async def _handle_matrix_redaction( async def _handle_matrix_redaction(self, sender: u.User, event_id: EventID) -> None:
self, sender: u.User, event_id: EventID, redaction_event_id: EventID sender, _ = await self.get_relay_sender(sender, f"redaction {event_id}")
) -> None: if not sender:
self.log.info("TODO: _handle_matrix_redaction") raise Exception("not logged in")
message = await DBMessage.get_by_mxid(event_id, self.mxid)
if message:
if not message.ktid:
raise NotImplementedError("Tried to redact message whose ktid is unknown")
try:
await message.delete()
await sender.client.delete_chat(self.channel_props, message.ktid)
except Exception as e:
self.log.exception(f"Unsend failed: {e}")
raise
return
raise NotImplementedError("Only message redactions are supported")
""" TODO
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
if reaction:
try:
await reaction.delete()
await sender.client.react(reaction.kt_msgid, None)
except Exception as e:
self.log.exception(f"Removing reaction failed: {e}")
raise
return
raise NotImplementedError("Only message and reaction redactions are supported")
"""
async def handle_matrix_reaction( async def handle_matrix_reaction(
self, sender: u.User, event_id: EventID, reacting_to: EventID, reaction: str self, sender: u.User, event_id: EventID, reacting_to: EventID, reaction: str
@ -916,16 +951,6 @@ class Portal(DBPortal, BasePortal):
else: else:
self.log.debug(f"{user.mxid} left portal to {self.ktid}") self.log.debug(f"{user.mxid} left portal to {self.ktid}")
async def _set_typing(self, users: set[UserID], typing: bool) -> None:
self.log.info("TODO: _set_typing")
async def handle_matrix_typing(self, users: set[UserID]) -> None:
await asyncio.gather(
self._set_typing(users - self._typing, typing=True),
self._set_typing(self._typing - users, typing=False),
)
self._typing = users
# endregion # endregion
# region KakaoTalk event handling # region KakaoTalk event handling
@ -944,7 +969,7 @@ class Portal(DBPortal, BasePortal):
return False return False
return True return True
async def _add_remote_reply( async def _add_kakaotalk_reply(
self, content: MessageEventContent, reply_to: ReplyAttachment self, content: MessageEventContent, reply_to: ReplyAttachment
) -> None: ) -> None:
message = await DBMessage.get_by_ktid(reply_to.src_logId, self.kt_receiver) message = await DBMessage.get_by_ktid(reply_to.src_logId, self.kt_receiver)
@ -976,34 +1001,34 @@ class Portal(DBPortal, BasePortal):
content.set_reply(evt) content.set_reply(evt)
async def handle_remote_message( async def handle_kakaotalk_chat(
self, self,
source: u.User, source: u.User,
sender: p.Puppet, sender: p.Puppet,
message: Chatlog, chat: Chatlog,
) -> None: ) -> None:
try: try:
await self._handle_remote_message(source, sender, message) await self._handle_kakaotalk_chat(source, sender, chat)
except Exception: except Exception:
self.log.exception( self.log.exception(
"Error handling KakaoTalk message %s", "Error handling KakaoTalk chat %s",
message.logId, chat.logId,
) )
async def _handle_remote_message( async def _handle_kakaotalk_chat(
self, self,
source: u.User, source: u.User,
sender: p.Puppet, sender: p.Puppet,
message: Chatlog, chat: Chatlog,
) -> None: ) -> None:
# TODO Backfill!! This avoids timing conflicts on startup sync # TODO Backfill!! This avoids timing conflicts on startup sync
self.log.debug(f"Handling KakaoTalk event {message.logId}") self.log.debug(f"Handling KakaoTalk event {chat.logId}")
if not self.mxid: if not self.mxid:
mxid = await self.create_matrix_room(source) mxid = await self.create_matrix_room(source)
if not mxid: if not mxid:
# Failed to create # Failed to create
return return
if not await self._bridge_own_message_pm(source, sender, f"message {message.logId}"): if not await self._bridge_own_message_pm(source, sender, f"chat {chat.logId}"):
return return
intent = sender.intent_for(self) intent = sender.intent_for(self)
if ( if (
@ -1017,56 +1042,56 @@ class Portal(DBPortal, BasePortal):
await intent.ensure_joined(self.mxid) await intent.ensure_joined(self.mxid)
self._backfill_leave.add(intent) self._backfill_leave.add(intent)
handler = self._message_type_handler_map.get(message.type, Portal._handle_remote_unsupported) handler = self._chat_type_handler_map.get(chat.type, Portal._handle_kakaotalk_unsupported)
event_ids = [ event_ids = [
event_id for event_id in event_id for event_id in
await handler( await handler(
self, self,
source=source, source=source,
intent=intent, intent=intent,
attachment=message.attachment, attachment=chat.attachment,
timestamp=message.sendAt, timestamp=chat.sendAt,
message_text=message.text, chat_text=chat.text,
message_type=message.type, chat_type=chat.type,
) )
if event_id if event_id
] ]
if not event_ids: if not event_ids:
self.log.warning(f"Unhandled KakaoTalk message {message.logId}") self.log.warning(f"Unhandled KakaoTalk chat {chat.logId}")
return return
self.log.debug(f"Handled KakaoTalk message {message.logId} -> {event_ids}") self.log.debug(f"Handled KakaoTalk chat {chat.logId} -> {event_ids}")
# TODO Might have to handle remote reactions on messages created by bulk_create # TODO Might have to handle remote reactions on messages created by bulk_create
await DBMessage.bulk_create( await DBMessage.bulk_create(
ktid=message.logId, ktid=chat.logId,
kt_chat=self.ktid, kt_chat=self.ktid,
kt_receiver=self.kt_receiver, kt_receiver=self.kt_receiver,
mx_room=self.mxid, mx_room=self.mxid,
timestamp=message.sendAt, timestamp=chat.sendAt,
event_ids=event_ids, event_ids=event_ids,
) )
await self._send_delivery_receipt(event_ids[-1]) await self._send_delivery_receipt(event_ids[-1])
async def _handle_remote_unsupported( async def _handle_kakaotalk_unsupported(
self, self,
intent: IntentAPI, intent: IntentAPI,
timestamp: int, timestamp: int,
message_text: str | None, chat_text: str | None,
message_type: ChatType, chat_type: ChatType,
**_ **_
) -> Awaitable[list[EventID]]: ) -> Awaitable[list[EventID]]:
try: try:
type_str = KnownChatType(message_type).name.lower() type_str = KnownChatType(chat_type).name.lower()
except ValueError: except ValueError:
type_str = str(message_type) type_str = str(chat_type)
self.log.warning("No handler for message type \"%s\" (%s)", self.log.warning("No handler for chat type \"%s\" (%s)",
type_str, type_str,
f"text = {message_text}" if message_text is not None else "no text", f"text = {chat_text}" if chat_text is not None else "no text",
) )
if message_text: if chat_text:
events = await self._handle_remote_text( events = await self._handle_kakaotalk_text(
intent=intent, intent=intent,
timestamp=timestamp, timestamp=timestamp,
message_text=message_text, chat_text=chat_text,
) )
else: else:
events = [] events = []
@ -1079,40 +1104,40 @@ class Portal(DBPortal, BasePortal):
events.append(await self._send_message(intent, content, timestamp=timestamp)) events.append(await self._send_message(intent, content, timestamp=timestamp))
return events return events
async def _handle_remote_text( async def _handle_kakaotalk_text(
self, self,
intent: IntentAPI, intent: IntentAPI,
attachment: Attachment | None, attachment: Attachment | None,
timestamp: int, timestamp: int,
message_text: str | None, chat_text: str | None,
**_ **_
) -> list[EventID]: ) -> list[EventID]:
content = await kakaotalk_to_matrix(message_text, attachment.mentions if attachment else None) content = await kakaotalk_to_matrix(chat_text, attachment.mentions if attachment else None)
return [await self._send_message(intent, content, timestamp=timestamp)] return [await self._send_message(intent, content, timestamp=timestamp)]
async def _handle_remote_reply( async def _handle_kakaotalk_reply(
self, self,
intent: IntentAPI, intent: IntentAPI,
attachment: ReplyAttachment, attachment: ReplyAttachment,
timestamp: int, timestamp: int,
message_text: str, chat_text: str,
**_ **_
) -> list[EventID]: ) -> list[EventID]:
content = await kakaotalk_to_matrix(message_text, attachment.mentions) content = await kakaotalk_to_matrix(chat_text, attachment.mentions)
await self._add_remote_reply(content, attachment) await self._add_kakaotalk_reply(content, attachment)
return [await self._send_message(intent, content, timestamp=timestamp)] return [await self._send_message(intent, content, timestamp=timestamp)]
def _handle_remote_photo(self, **kwargs) -> Awaitable[list[EventID]]: def _handle_kakaotalk_photo(self, **kwargs) -> Awaitable[list[EventID]]:
return asyncio.gather(self._handle_remote_uniphoto(**kwargs)) return asyncio.gather(self._handle_kakaotalk_uniphoto(**kwargs))
async def _handle_remote_multiphoto( async def _handle_kakaotalk_multiphoto(
self, self,
attachment: MultiPhotoAttachment, attachment: MultiPhotoAttachment,
**kwargs **kwargs
) -> Awaitable[list[EventID]]: ) -> Awaitable[list[EventID]]:
# TODO Upload media concurrently, but post messages sequentially # TODO Upload media concurrently, but post messages sequentially
return [ return [
await self._handle_remote_uniphoto( await self._handle_kakaotalk_uniphoto(
attachment=PhotoAttachment( attachment=PhotoAttachment(
shout=attachment.shout, shout=attachment.shout,
mentions=attachment.mentions, mentions=attachment.mentions,
@ -1133,12 +1158,12 @@ class Portal(DBPortal, BasePortal):
for i in range(len(attachment.imageUrls)) for i in range(len(attachment.imageUrls))
] ]
def _handle_remote_uniphoto( def _handle_kakaotalk_uniphoto(
self, self,
attachment: PhotoAttachment, attachment: PhotoAttachment,
**kwargs **kwargs
) -> Awaitable[EventID]: ) -> Awaitable[EventID]:
return self._handle_remote_media( return self._handle_kakaotalk_media(
attachment, attachment,
ImageInfo( ImageInfo(
mimetype=attachment.mt, mimetype=attachment.mt,
@ -1150,12 +1175,12 @@ class Portal(DBPortal, BasePortal):
**kwargs **kwargs
) )
def _handle_remote_video( def _handle_kakaotalk_video(
self, self,
attachment: VideoAttachment, attachment: VideoAttachment,
**kwargs **kwargs
) -> Awaitable[list[EventID]]: ) -> Awaitable[list[EventID]]:
return asyncio.gather(self._handle_remote_media( return asyncio.gather(self._handle_kakaotalk_media(
attachment, attachment,
VideoInfo( VideoInfo(
duration=attachment.d, duration=attachment.d,
@ -1166,12 +1191,12 @@ class Portal(DBPortal, BasePortal):
**kwargs **kwargs
)) ))
def _handle_remote_audio( def _handle_kakaotalk_audio(
self, self,
attachment: AudioAttachment, attachment: AudioAttachment,
**kwargs **kwargs
) -> Awaitable[list[EventID]]: ) -> Awaitable[list[EventID]]:
return asyncio.gather(self._handle_remote_media( return asyncio.gather(self._handle_kakaotalk_media(
attachment, attachment,
AudioInfo( AudioInfo(
size=attachment.s, size=attachment.s,
@ -1182,12 +1207,12 @@ class Portal(DBPortal, BasePortal):
)) ))
""" TODO Find what auth is required for reading file contents """ TODO Find what auth is required for reading file contents
def _handle_remote_file( def _handle_kakaotalk_file(
self, self,
attachment: FileAttachment, attachment: FileAttachment,
**kwargs **kwargs
) -> Awaitable[list[EventID]]: ) -> Awaitable[list[EventID]]:
return asyncio.gather(self._handle_remote_media( return asyncio.gather(self._handle_kakaotalk_media(
attachment, attachment,
FileInfo( FileInfo(
size=attachment.size, size=attachment.size,
@ -1197,7 +1222,7 @@ class Portal(DBPortal, BasePortal):
)) ))
""" """
async def _handle_remote_media( async def _handle_kakaotalk_media(
self, self,
attachment: MediaAttachment, attachment: MediaAttachment,
info: MediaInfo, info: MediaInfo,
@ -1206,10 +1231,10 @@ class Portal(DBPortal, BasePortal):
source: u.User, source: u.User,
intent: IntentAPI, intent: IntentAPI,
timestamp: int, timestamp: int,
message_text: str | None, chat_text: str | None,
**_ **_
) -> EventID: ) -> EventID:
mxc, additional_info, decryption_info = await self._reupload_remote_file( mxc, additional_info, decryption_info = await self._reupload_kakaotalk_file(
attachment.url, attachment.url,
source, source,
intent, intent,
@ -1220,10 +1245,27 @@ class Portal(DBPortal, BasePortal):
info.size = additional_info.size info.size = additional_info.size
info.mimetype = additional_info.mimetype info.mimetype = additional_info.mimetype
content = MediaMessageEventContent( content = MediaMessageEventContent(
url=mxc, file=decryption_info, msgtype=msgtype, body=message_text, info=info url=mxc, file=decryption_info, msgtype=msgtype, body=chat_text, info=info
) )
return await self._send_message(intent, content, timestamp=timestamp) return await self._send_message(intent, content, timestamp=timestamp)
async def handle_kakaotalk_chat_delete(
self,
sender: p.Puppet,
chat_id: Long,
timestamp: int,
) -> None:
if not self.mxid:
return
for message in await DBMessage.get_all_by_ktid(chat_id, self.kt_receiver):
try:
await sender.intent_for(self).redact(
message.mx_room, message.mxid, timestamp=timestamp
)
except MForbidden:
await self.main_intent.redact(message.mx_room, message.mxid, timestamp=timestamp)
await message.delete()
# TODO Many more remote handlers # TODO Many more remote handlers
# endregion # endregion
@ -1273,24 +1315,24 @@ class Portal(DBPortal, BasePortal):
) -> None: ) -> None:
self.log.debug(f"Backfilling history through {source.mxid}") self.log.debug(f"Backfilling history through {source.mxid}")
self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}") self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}")
messages = await source.client.get_chats( chats = await source.client.get_chats(
self.channel_props, self.channel_props,
after_log_id, after_log_id,
limit, limit,
) )
if not messages: if not chats:
self.log.debug("Didn't get any messages from server") self.log.debug("Didn't get any messages from server")
return return
self.log.debug(f"Got {len(messages)} message{'s' if len(messages) > 1 else ''} from server") self.log.debug(f"Got {len(chats)} message{'s' if len(chats) > 1 else ''} from server")
self._backfill_leave = set() self._backfill_leave = set()
async with NotificationDisabler(self.mxid, source): async with NotificationDisabler(self.mxid, source):
for message in messages: for chat in chats:
puppet = await p.Puppet.get_by_ktid(message.sender.userId) puppet = await p.Puppet.get_by_ktid(chat.sender.userId)
await self.handle_remote_message(source, puppet, message) await self.handle_kakaotalk_chat(source, puppet, chat)
for intent in self._backfill_leave: for intent in self._backfill_leave:
self.log.trace("Leaving room with %s post-backfill", intent.mxid) self.log.trace("Leaving room with %s post-backfill", intent.mxid)
await intent.leave_room(self.mxid) await intent.leave_room(self.mxid)
self.log.info("Backfilled %d messages through %s", len(messages), source.mxid) self.log.info("Backfilled %d messages through %s", len(chats), source.mxid)
# region Database getters # region Database getters

View File

@ -1,2 +1,2 @@
from .rpc import RPCClient from .rpc import RPCClient, EventHandler
from .types import RPCError from .types import RPCError

View File

@ -275,6 +275,9 @@ class RPCClient:
except asyncio.LimitOverrunError as e: except asyncio.LimitOverrunError as e:
self.log.warning(f"Buffer overrun: {e}") self.log.warning(f"Buffer overrun: {e}")
line += await self._reader.read(self._reader._limit) line += await self._reader.read(self._reader._limit)
except ConnectionResetError:
if self._reader is not None:
raise
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
if not line: if not line:

View File

@ -48,7 +48,8 @@ from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInf
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes
METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync") METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
METRIC_MESSAGE = Summary("bridge_on_message", "calls to on_message") METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge") METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk") METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
@ -658,18 +659,38 @@ class User(DBUser, BaseUser):
self._logged_in_info_time = time.monotonic() self._logged_in_info_time = time.monotonic()
asyncio.create_task(self.post_login(is_startup=True)) asyncio.create_task(self.post_login(is_startup=True))
@async_time(METRIC_MESSAGE) @async_time(METRIC_CHAT)
async def on_message(self, evt: Chatlog, channel_id: Long, channel_type: ChannelType) -> None: async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
portal = await po.Portal.get_by_ktid( portal = await po.Portal.get_by_ktid(
channel_id, channel_id,
kt_receiver=self.ktid, kt_receiver=self.ktid,
kt_type=channel_type kt_type=channel_type
) )
puppet = await pu.Puppet.get_by_ktid(evt.sender.userId) puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
await portal.backfill_lock.wait(evt.logId) await portal.backfill_lock.wait(chat.logId)
if not puppet.name: if not puppet.name:
portal.schedule_resync(self, puppet) portal.schedule_resync(self, puppet)
await portal.handle_remote_message(self, puppet, evt) await portal.handle_kakaotalk_chat(self, puppet, chat)
@async_time(METRIC_CHAT_DELETED)
async def on_chat_deleted(
self,
chat_id: Long,
sender_id: Long,
timestamp: int,
channel_id: Long,
channel_type: ChannelType,
) -> None:
portal = await po.Portal.get_by_ktid(
channel_id,
kt_receiver=self.ktid,
kt_type=channel_type,
create=False
)
if portal and portal.mxid:
await portal.backfill_lock.wait(f"redaction of {chat_id}")
puppet = await pu.Puppet.get_by_ktid(sender_id)
await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)
# TODO Many more handlers # TODO Many more handlers

View File

@ -34,6 +34,12 @@ const { KnownChatType } = chat
import { emitLines, promisify } from "./util.js" import { emitLines, promisify } from "./util.js"
/**
* @typedef {Object} ChannelProps
* @property {Long} id
* @property {ChannelType} type
*/
ServiceApiClient.prototype.requestFriendList = async function() { ServiceApiClient.prototype.requestFriendList = async function() {
const res = await this._client.requestData( const res = await this._client.requestData(
@ -77,7 +83,7 @@ class UserClient {
this.peerClient = peerClient this.peerClient = peerClient
this.#talkClient.on("chat", (data, channel) => { this.#talkClient.on("chat", (data, channel) => {
this.log(`Received chat message ${data.chat.logId} in channel ${channel.channelId}`) this.log(`${data.chat.logId} received in channel ${channel.channelId}`)
return this.write("chat", { return this.write("chat", {
//is_sequential: true, // TODO Make sequential per user & channel (if it isn't already) //is_sequential: true, // TODO Make sequential per user & channel (if it isn't already)
chatlog: data.chat, chatlog: data.chat,
@ -86,6 +92,28 @@ class UserClient {
}) })
}) })
this.#talkClient.on("chat_deleted", (feedChatlog, channel, feed) => {
this.log(`${feed.logId} deleted in channel ${channel.channelId} by user ${feedChatlog.sender.userId}`);
return this.write("chat_deleted", {
chatId: feed.logId,
senderId: feedChatlog.sender.userId,
timestamp: feedChatlog.sendAt,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
this.#talkClient.on("message_hidden", (hideLog, channel, feed) => {
this.log(`Message ${hideLog.logId} hid from channel ${channel.channelId} by user ${hideLog.sender.userId}`);
return this.write("chat_deleted", {
chatId: feed.logId,
senderId: hideLog.sender.userId,
timestamp: hideLog.sendAt,
channelId: channel.channelId,
channelType: channel.info.type,
})
})
/* TODO Many more listeners /* TODO Many more listeners
this.#talkClient.on("chat_read", (chat, channel, reader) => { this.#talkClient.on("chat_read", (chat, channel, reader) => {
this.log(`chat_read in channel ${channel.channelId}`) this.log(`chat_read in channel ${channel.channelId}`)
@ -132,24 +160,22 @@ class UserClient {
} }
/** /**
* @param {Object} channel_props * @param {ChannelProps} channelProps
* @param {Long} channel_props.id
* @param {ChannelType} channel_props.type
*/ */
async getChannel(channel_props) { async getChannel(channelProps) {
let channel = this.#talkClient.channelList.get(channel_props.id) let channel = this.#talkClient.channelList.get(channelProps.id)
if (channel) { if (channel) {
return channel return channel
} else { } else {
const channelList = getChannelListForType( const channelList = getChannelListForType(
this.#talkClient.channelList, this.#talkClient.channelList,
channel_props.type channelProps.type
) )
const res = await channelList.addChannel({ const res = await channelList.addChannel({
channelId: channel_props.id, channelId: channelProps.id,
}) })
if (!res.success) { if (!res.success) {
throw new Error(`Unable to add ${channel_props.type} channel ${channel_props.id}`) throw new Error(`Unable to add ${channelProps.type} channel ${channelProps.id}`)
} }
return res.result return res.result
} }
@ -347,10 +373,10 @@ export default class PeerClient {
/** /**
* @param {string} mxid * @param {string} mxid
* @param {Object} channel_props * @param {ChannelProps} channelProps
*/ */
async #getUserChannel(mxid, channel_props) { async #getUserChannel(mxid, channelProps) {
return await this.#getUser(mxid).getChannel(channel_props) return await this.#getUser(mxid).getChannel(channelProps)
} }
/** /**
@ -424,7 +450,7 @@ export default class PeerClient {
/** /**
* @param {Object} req * @param {Object} req
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {ChannelProps} req.channel_props
*/ */
getPortalChannelInfo = async (req) => { getPortalChannelInfo = async (req) => {
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props) const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
@ -442,7 +468,7 @@ export default class PeerClient {
/** /**
* @param {Object} req * @param {Object} req
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {ChannelProps} req.channel_props
*/ */
getParticipants = async (req) => { getParticipants = async (req) => {
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props) const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
@ -452,7 +478,7 @@ export default class PeerClient {
/** /**
* @param {Object} req * @param {Object} req
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {ChannelProps} req.channel_props
* @param {?Long} req.sync_from * @param {?Long} req.sync_from
* @param {?Number} req.limit * @param {?Number} req.limit
*/ */
@ -506,7 +532,7 @@ export default class PeerClient {
/** /**
* @param {Object} req * @param {Object} req
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {ChannelProps} req.channel_props
* @param {string} req.text * @param {string} req.text
* @param {?ReplyAttachment} req.reply_to * @param {?ReplyAttachment} req.reply_to
* @param {?MentionStruct[]} req.mentions * @param {?MentionStruct[]} req.mentions
@ -524,7 +550,7 @@ export default class PeerClient {
/** /**
* @param {Object} req * @param {Object} req
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {ChannelProps} req.channel_props
* @param {int} req.type * @param {int} req.type
* @param {number[]} req.data * @param {number[]} req.data
* @param {string} req.name * @param {string} req.name
@ -544,6 +570,20 @@ export default class PeerClient {
}) })
} }
/**
* @param {Object} req
* @param {string} req.mxid
* @param {ChannelProps} req.channel_props
* @param {Long} req.chat_id
*/
deleteChat = async (req) => {
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
return await talkChannel.deleteChat({
logId: req.chat_id,
})
}
#makeCommandResult(result) { #makeCommandResult(result) {
return { return {
success: true, success: true,
@ -627,6 +667,7 @@ export default class PeerClient {
get_memo_ids: this.getMemoIds, get_memo_ids: this.getMemoIds,
send_chat: this.sendChat, send_chat: this.sendChat,
send_media: this.sendMedia, send_media: this.sendMedia,
delete_chat: this.deleteChat,
}[req.command] || this.handleUnknownCommand }[req.command] || this.handleUnknownCommand
} }
const resp = { id: req.id } const resp = { id: req.id }
@ -676,11 +717,18 @@ export default class PeerClient {
* @param {ChannelType} channelType * @param {ChannelType} channelType
*/ */
function getChannelListForType(channelList, channelType) { function getChannelListForType(channelList, channelType) {
return isChannelTypeOpen(channelType) ? channelList.open : channelList.normal
}
/**
* @param {ChannelType} channelType
*/
function isChannelTypeOpen(channelType) {
switch (channelType) { switch (channelType) {
case "OM": case "OM":
case "OD": case "OD":
return channelList.open return true
default: default:
return channelList.normal return false
} }
} }