Rename remote->kakaotalk and message->chat
This commit is contained in:
parent
f7d889486a
commit
60b115bd38
@ -301,7 +301,7 @@ class Client:
|
||||
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
|
||||
)
|
||||
|
||||
async def send_message(
|
||||
async def send_chat(
|
||||
self,
|
||||
channel_props: ChannelProps,
|
||||
text: str,
|
||||
@ -375,15 +375,15 @@ class Client:
|
||||
|
||||
# region listeners
|
||||
|
||||
async def _on_message(self, data: dict[str, JSON]) -> None:
|
||||
await self.user.on_message(
|
||||
async def _on_chat(self, data: dict[str, JSON]) -> None:
|
||||
await self.user.on_chat(
|
||||
Chatlog.deserialize(data["chatlog"]),
|
||||
Long.deserialize(data["channelId"]),
|
||||
str(data["channelType"]),
|
||||
)
|
||||
|
||||
async def _on_message_deleted(self, data: dict[str, JSON]) -> None:
|
||||
await self.user.on_message_deleted(
|
||||
async def _on_chat_deleted(self, data: dict[str, JSON]) -> None:
|
||||
await self.user.on_chat_deleted(
|
||||
Long.deserialize(data["chatId"]),
|
||||
Long.deserialize(data["senderId"]),
|
||||
int(data["timestamp"]),
|
||||
@ -414,8 +414,8 @@ class Client:
|
||||
|
||||
|
||||
def _start_listen(self) -> None:
|
||||
self._add_event_handler("chat", self._on_message)
|
||||
self._add_event_handler("chat_deleted", self._on_message_deleted)
|
||||
self._add_event_handler("chat", self._on_chat)
|
||||
self._add_event_handler("chat_deleted", self._on_chat_deleted)
|
||||
# TODO many more listeners
|
||||
self._add_event_handler("disconnected", self._on_listen_disconnect)
|
||||
self._add_event_handler("switch_server", self._on_switch_server)
|
||||
|
@ -191,14 +191,14 @@ class Portal(DBPortal, BasePortal):
|
||||
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
|
||||
|
||||
# TODO More
|
||||
cls._message_type_handler_map = {
|
||||
KnownChatType.TEXT: cls._handle_remote_text,
|
||||
KnownChatType.REPLY: cls._handle_remote_reply,
|
||||
KnownChatType.PHOTO: cls._handle_remote_photo,
|
||||
KnownChatType.MULTIPHOTO: cls._handle_remote_multiphoto,
|
||||
KnownChatType.VIDEO: cls._handle_remote_video,
|
||||
KnownChatType.AUDIO: cls._handle_remote_audio,
|
||||
#KnownChatType.FILE: cls._handle_remote_file,
|
||||
cls._chat_type_handler_map = {
|
||||
KnownChatType.TEXT: cls._handle_kakaotalk_text,
|
||||
KnownChatType.REPLY: cls._handle_kakaotalk_reply,
|
||||
KnownChatType.PHOTO: cls._handle_kakaotalk_photo,
|
||||
KnownChatType.MULTIPHOTO: cls._handle_kakaotalk_multiphoto,
|
||||
KnownChatType.VIDEO: cls._handle_kakaotalk_video,
|
||||
KnownChatType.AUDIO: cls._handle_kakaotalk_audio,
|
||||
#KnownChatType.FILE: cls._handle_kakaotalk_file,
|
||||
}
|
||||
|
||||
# region DB conversion
|
||||
@ -318,7 +318,7 @@ class Portal(DBPortal, BasePortal):
|
||||
return info
|
||||
|
||||
@classmethod
|
||||
async def _reupload_remote_file(
|
||||
async def _reupload_kakaotalk_file(
|
||||
cls,
|
||||
url: str,
|
||||
source: u.User,
|
||||
@ -808,7 +808,7 @@ class Portal(DBPortal, BasePortal):
|
||||
) -> None:
|
||||
converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self.main_intent)
|
||||
try:
|
||||
chatlog = await sender.client.send_message(
|
||||
chatlog = await sender.client.send_chat(
|
||||
self.channel_props,
|
||||
text=converted.text,
|
||||
reply_to=converted.reply_to,
|
||||
@ -944,7 +944,7 @@ class Portal(DBPortal, BasePortal):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _add_remote_reply(
|
||||
async def _add_kakaotalk_reply(
|
||||
self, content: MessageEventContent, reply_to: ReplyAttachment
|
||||
) -> None:
|
||||
message = await DBMessage.get_by_ktid(reply_to.src_logId, self.kt_receiver)
|
||||
@ -976,34 +976,34 @@ class Portal(DBPortal, BasePortal):
|
||||
|
||||
content.set_reply(evt)
|
||||
|
||||
async def handle_remote_message(
|
||||
async def handle_kakaotalk_chat(
|
||||
self,
|
||||
source: u.User,
|
||||
sender: p.Puppet,
|
||||
message: Chatlog,
|
||||
chat: Chatlog,
|
||||
) -> None:
|
||||
try:
|
||||
await self._handle_remote_message(source, sender, message)
|
||||
await self._handle_kakaotalk_chat(source, sender, chat)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error handling KakaoTalk message %s",
|
||||
message.logId,
|
||||
"Error handling KakaoTalk chat %s",
|
||||
chat.logId,
|
||||
)
|
||||
|
||||
async def _handle_remote_message(
|
||||
async def _handle_kakaotalk_chat(
|
||||
self,
|
||||
source: u.User,
|
||||
sender: p.Puppet,
|
||||
message: Chatlog,
|
||||
chat: Chatlog,
|
||||
) -> None:
|
||||
# TODO Backfill!! This avoids timing conflicts on startup sync
|
||||
self.log.debug(f"Handling KakaoTalk event {message.logId}")
|
||||
self.log.debug(f"Handling KakaoTalk event {chat.logId}")
|
||||
if not self.mxid:
|
||||
mxid = await self.create_matrix_room(source)
|
||||
if not mxid:
|
||||
# Failed to create
|
||||
return
|
||||
if not await self._bridge_own_message_pm(source, sender, f"message {message.logId}"):
|
||||
if not await self._bridge_own_message_pm(source, sender, f"chat {chat.logId}"):
|
||||
return
|
||||
intent = sender.intent_for(self)
|
||||
if (
|
||||
@ -1017,56 +1017,56 @@ class Portal(DBPortal, BasePortal):
|
||||
await intent.ensure_joined(self.mxid)
|
||||
self._backfill_leave.add(intent)
|
||||
|
||||
handler = self._message_type_handler_map.get(message.type, Portal._handle_remote_unsupported)
|
||||
handler = self._chat_type_handler_map.get(chat.type, Portal._handle_kakaotalk_unsupported)
|
||||
event_ids = [
|
||||
event_id for event_id in
|
||||
await handler(
|
||||
self,
|
||||
source=source,
|
||||
intent=intent,
|
||||
attachment=message.attachment,
|
||||
timestamp=message.sendAt,
|
||||
message_text=message.text,
|
||||
message_type=message.type,
|
||||
attachment=chat.attachment,
|
||||
timestamp=chat.sendAt,
|
||||
chat_text=chat.text,
|
||||
chat_type=chat.type,
|
||||
)
|
||||
if event_id
|
||||
]
|
||||
if not event_ids:
|
||||
self.log.warning(f"Unhandled KakaoTalk message {message.logId}")
|
||||
self.log.warning(f"Unhandled KakaoTalk chat {chat.logId}")
|
||||
return
|
||||
self.log.debug(f"Handled KakaoTalk message {message.logId} -> {event_ids}")
|
||||
self.log.debug(f"Handled KakaoTalk chat {chat.logId} -> {event_ids}")
|
||||
# TODO Might have to handle remote reactions on messages created by bulk_create
|
||||
await DBMessage.bulk_create(
|
||||
ktid=message.logId,
|
||||
ktid=chat.logId,
|
||||
kt_chat=self.ktid,
|
||||
kt_receiver=self.kt_receiver,
|
||||
mx_room=self.mxid,
|
||||
timestamp=message.sendAt,
|
||||
timestamp=chat.sendAt,
|
||||
event_ids=event_ids,
|
||||
)
|
||||
await self._send_delivery_receipt(event_ids[-1])
|
||||
|
||||
async def _handle_remote_unsupported(
|
||||
async def _handle_kakaotalk_unsupported(
|
||||
self,
|
||||
intent: IntentAPI,
|
||||
timestamp: int,
|
||||
message_text: str | None,
|
||||
message_type: ChatType,
|
||||
chat_text: str | None,
|
||||
chat_type: ChatType,
|
||||
**_
|
||||
) -> Awaitable[list[EventID]]:
|
||||
try:
|
||||
type_str = KnownChatType(message_type).name.lower()
|
||||
type_str = KnownChatType(chat_type).name.lower()
|
||||
except ValueError:
|
||||
type_str = str(message_type)
|
||||
self.log.warning("No handler for message type \"%s\" (%s)",
|
||||
type_str = str(chat_type)
|
||||
self.log.warning("No handler for chat type \"%s\" (%s)",
|
||||
type_str,
|
||||
f"text = {message_text}" if message_text is not None else "no text",
|
||||
f"text = {chat_text}" if chat_text is not None else "no text",
|
||||
)
|
||||
if message_text:
|
||||
events = await self._handle_remote_text(
|
||||
if chat_text:
|
||||
events = await self._handle_kakaotalk_text(
|
||||
intent=intent,
|
||||
timestamp=timestamp,
|
||||
message_text=message_text,
|
||||
chat_text=chat_text,
|
||||
)
|
||||
else:
|
||||
events = []
|
||||
@ -1079,40 +1079,40 @@ class Portal(DBPortal, BasePortal):
|
||||
events.append(await self._send_message(intent, content, timestamp=timestamp))
|
||||
return events
|
||||
|
||||
async def _handle_remote_text(
|
||||
async def _handle_kakaotalk_text(
|
||||
self,
|
||||
intent: IntentAPI,
|
||||
attachment: Attachment | None,
|
||||
timestamp: int,
|
||||
message_text: str | None,
|
||||
chat_text: str | None,
|
||||
**_
|
||||
) -> list[EventID]:
|
||||
content = await kakaotalk_to_matrix(message_text, attachment.mentions if attachment else None)
|
||||
content = await kakaotalk_to_matrix(chat_text, attachment.mentions if attachment else None)
|
||||
return [await self._send_message(intent, content, timestamp=timestamp)]
|
||||
|
||||
async def _handle_remote_reply(
|
||||
async def _handle_kakaotalk_reply(
|
||||
self,
|
||||
intent: IntentAPI,
|
||||
attachment: ReplyAttachment,
|
||||
timestamp: int,
|
||||
message_text: str,
|
||||
chat_text: str,
|
||||
**_
|
||||
) -> list[EventID]:
|
||||
content = await kakaotalk_to_matrix(message_text, attachment.mentions)
|
||||
await self._add_remote_reply(content, attachment)
|
||||
content = await kakaotalk_to_matrix(chat_text, attachment.mentions)
|
||||
await self._add_kakaotalk_reply(content, attachment)
|
||||
return [await self._send_message(intent, content, timestamp=timestamp)]
|
||||
|
||||
def _handle_remote_photo(self, **kwargs) -> Awaitable[list[EventID]]:
|
||||
return asyncio.gather(self._handle_remote_uniphoto(**kwargs))
|
||||
def _handle_kakaotalk_photo(self, **kwargs) -> Awaitable[list[EventID]]:
|
||||
return asyncio.gather(self._handle_kakaotalk_uniphoto(**kwargs))
|
||||
|
||||
async def _handle_remote_multiphoto(
|
||||
async def _handle_kakaotalk_multiphoto(
|
||||
self,
|
||||
attachment: MultiPhotoAttachment,
|
||||
**kwargs
|
||||
) -> Awaitable[list[EventID]]:
|
||||
# TODO Upload media concurrently, but post messages sequentially
|
||||
return [
|
||||
await self._handle_remote_uniphoto(
|
||||
await self._handle_kakaotalk_uniphoto(
|
||||
attachment=PhotoAttachment(
|
||||
shout=attachment.shout,
|
||||
mentions=attachment.mentions,
|
||||
@ -1133,12 +1133,12 @@ class Portal(DBPortal, BasePortal):
|
||||
for i in range(len(attachment.imageUrls))
|
||||
]
|
||||
|
||||
def _handle_remote_uniphoto(
|
||||
def _handle_kakaotalk_uniphoto(
|
||||
self,
|
||||
attachment: PhotoAttachment,
|
||||
**kwargs
|
||||
) -> Awaitable[EventID]:
|
||||
return self._handle_remote_media(
|
||||
return self._handle_kakaotalk_media(
|
||||
attachment,
|
||||
ImageInfo(
|
||||
mimetype=attachment.mt,
|
||||
@ -1150,12 +1150,12 @@ class Portal(DBPortal, BasePortal):
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def _handle_remote_video(
|
||||
def _handle_kakaotalk_video(
|
||||
self,
|
||||
attachment: VideoAttachment,
|
||||
**kwargs
|
||||
) -> Awaitable[list[EventID]]:
|
||||
return asyncio.gather(self._handle_remote_media(
|
||||
return asyncio.gather(self._handle_kakaotalk_media(
|
||||
attachment,
|
||||
VideoInfo(
|
||||
duration=attachment.d,
|
||||
@ -1166,12 +1166,12 @@ class Portal(DBPortal, BasePortal):
|
||||
**kwargs
|
||||
))
|
||||
|
||||
def _handle_remote_audio(
|
||||
def _handle_kakaotalk_audio(
|
||||
self,
|
||||
attachment: AudioAttachment,
|
||||
**kwargs
|
||||
) -> Awaitable[list[EventID]]:
|
||||
return asyncio.gather(self._handle_remote_media(
|
||||
return asyncio.gather(self._handle_kakaotalk_media(
|
||||
attachment,
|
||||
AudioInfo(
|
||||
size=attachment.s,
|
||||
@ -1182,12 +1182,12 @@ class Portal(DBPortal, BasePortal):
|
||||
))
|
||||
|
||||
""" TODO Find what auth is required for reading file contents
|
||||
def _handle_remote_file(
|
||||
def _handle_kakaotalk_file(
|
||||
self,
|
||||
attachment: FileAttachment,
|
||||
**kwargs
|
||||
) -> Awaitable[list[EventID]]:
|
||||
return asyncio.gather(self._handle_remote_media(
|
||||
return asyncio.gather(self._handle_kakaotalk_media(
|
||||
attachment,
|
||||
FileInfo(
|
||||
size=attachment.size,
|
||||
@ -1197,7 +1197,7 @@ class Portal(DBPortal, BasePortal):
|
||||
))
|
||||
"""
|
||||
|
||||
async def _handle_remote_media(
|
||||
async def _handle_kakaotalk_media(
|
||||
self,
|
||||
attachment: MediaAttachment,
|
||||
info: MediaInfo,
|
||||
@ -1206,10 +1206,10 @@ class Portal(DBPortal, BasePortal):
|
||||
source: u.User,
|
||||
intent: IntentAPI,
|
||||
timestamp: int,
|
||||
message_text: str | None,
|
||||
chat_text: str | None,
|
||||
**_
|
||||
) -> EventID:
|
||||
mxc, additional_info, decryption_info = await self._reupload_remote_file(
|
||||
mxc, additional_info, decryption_info = await self._reupload_kakaotalk_file(
|
||||
attachment.url,
|
||||
source,
|
||||
intent,
|
||||
@ -1220,19 +1220,19 @@ class Portal(DBPortal, BasePortal):
|
||||
info.size = additional_info.size
|
||||
info.mimetype = additional_info.mimetype
|
||||
content = MediaMessageEventContent(
|
||||
url=mxc, file=decryption_info, msgtype=msgtype, body=message_text, info=info
|
||||
url=mxc, file=decryption_info, msgtype=msgtype, body=chat_text, info=info
|
||||
)
|
||||
return await self._send_message(intent, content, timestamp=timestamp)
|
||||
|
||||
async def handle_remote_message_delete(
|
||||
async def handle_kakaotalk_chat_delete(
|
||||
self,
|
||||
sender: p.Puppet,
|
||||
message_id: Long,
|
||||
chat_id: Long,
|
||||
timestamp: int,
|
||||
) -> None:
|
||||
if not self.mxid:
|
||||
return
|
||||
for message in await DBMessage.get_all_by_ktid(message_id, self.kt_receiver):
|
||||
for message in await DBMessage.get_all_by_ktid(chat_id, self.kt_receiver):
|
||||
try:
|
||||
await sender.intent_for(self).redact(
|
||||
message.mx_room, message.mxid, timestamp=timestamp
|
||||
@ -1290,24 +1290,24 @@ class Portal(DBPortal, BasePortal):
|
||||
) -> None:
|
||||
self.log.debug(f"Backfilling history through {source.mxid}")
|
||||
self.log.debug(f"Fetching {f'up to {limit}' if limit else 'all'} messages through {source.ktid}")
|
||||
messages = await source.client.get_chats(
|
||||
chats = await source.client.get_chats(
|
||||
self.channel_props,
|
||||
after_log_id,
|
||||
limit,
|
||||
)
|
||||
if not messages:
|
||||
if not chats:
|
||||
self.log.debug("Didn't get any messages from server")
|
||||
return
|
||||
self.log.debug(f"Got {len(messages)} message{'s' if len(messages) > 1 else ''} from server")
|
||||
self.log.debug(f"Got {len(chats)} message{'s' if len(chats) > 1 else ''} from server")
|
||||
self._backfill_leave = set()
|
||||
async with NotificationDisabler(self.mxid, source):
|
||||
for message in messages:
|
||||
puppet = await p.Puppet.get_by_ktid(message.sender.userId)
|
||||
await self.handle_remote_message(source, puppet, message)
|
||||
for chat in chats:
|
||||
puppet = await p.Puppet.get_by_ktid(chat.sender.userId)
|
||||
await self.handle_kakaotalk_chat(source, puppet, chat)
|
||||
for intent in self._backfill_leave:
|
||||
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
|
||||
await intent.leave_room(self.mxid)
|
||||
self.log.info("Backfilled %d messages through %s", len(messages), source.mxid)
|
||||
self.log.info("Backfilled %d messages through %s", len(chats), source.mxid)
|
||||
|
||||
# region Database getters
|
||||
|
||||
|
@ -48,8 +48,8 @@ from .kt.types.openlink.open_channel_info import OpenChannelData, OpenChannelInf
|
||||
from .kt.types.packet.chat.kickout import KnownKickoutType, KickoutRes
|
||||
|
||||
METRIC_CONNECT_AND_SYNC = Summary("bridge_connect_and_sync", "calls to connect_and_sync")
|
||||
METRIC_MESSAGE = Summary("bridge_on_message", "calls to on_message")
|
||||
METRIC_MESSAGE_DELETED = Summary("bridge_on_message_deleted", "calls to on_message_deleted")
|
||||
METRIC_CHAT = Summary("bridge_on_chat", "calls to on_chat")
|
||||
METRIC_CHAT_DELETED = Summary("bridge_on_chat_deleted", "calls to on_chat_deleted")
|
||||
METRIC_LOGGED_IN = Gauge("bridge_logged_in", "Users logged into the bridge")
|
||||
METRIC_CONNECTED = Gauge("bridge_connected", "Bridge users connected to KakaoTalk")
|
||||
|
||||
@ -659,23 +659,23 @@ class User(DBUser, BaseUser):
|
||||
self._logged_in_info_time = time.monotonic()
|
||||
asyncio.create_task(self.post_login(is_startup=True))
|
||||
|
||||
@async_time(METRIC_MESSAGE)
|
||||
async def on_message(self, evt: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
|
||||
@async_time(METRIC_CHAT)
|
||||
async def on_chat(self, chat: Chatlog, channel_id: Long, channel_type: ChannelType) -> None:
|
||||
portal = await po.Portal.get_by_ktid(
|
||||
channel_id,
|
||||
kt_receiver=self.ktid,
|
||||
kt_type=channel_type
|
||||
)
|
||||
puppet = await pu.Puppet.get_by_ktid(evt.sender.userId)
|
||||
await portal.backfill_lock.wait(evt.logId)
|
||||
puppet = await pu.Puppet.get_by_ktid(chat.sender.userId)
|
||||
await portal.backfill_lock.wait(chat.logId)
|
||||
if not puppet.name:
|
||||
portal.schedule_resync(self, puppet)
|
||||
await portal.handle_remote_message(self, puppet, evt)
|
||||
await portal.handle_kakaotalk_chat(self, puppet, chat)
|
||||
|
||||
@async_time(METRIC_MESSAGE_DELETED)
|
||||
async def on_message_deleted(
|
||||
@async_time(METRIC_CHAT_DELETED)
|
||||
async def on_chat_deleted(
|
||||
self,
|
||||
message_id: Long,
|
||||
chat_id: Long,
|
||||
sender_id: Long,
|
||||
timestamp: int,
|
||||
channel_id: Long,
|
||||
@ -688,9 +688,9 @@ class User(DBUser, BaseUser):
|
||||
create=False
|
||||
)
|
||||
if portal and portal.mxid:
|
||||
await portal.backfill_lock.wait(f"redaction of {message_id}")
|
||||
await portal.backfill_lock.wait(f"redaction of {chat_id}")
|
||||
puppet = await pu.Puppet.get_by_ktid(sender_id)
|
||||
await portal.handle_remote_message_delete(puppet, message_id, timestamp)
|
||||
await portal.handle_kakaotalk_chat_delete(puppet, chat_id, timestamp)
|
||||
|
||||
# TODO Many more handlers
|
||||
|
||||
|
@ -83,7 +83,7 @@ class UserClient {
|
||||
this.peerClient = peerClient
|
||||
|
||||
this.#talkClient.on("chat", (data, channel) => {
|
||||
this.log(`Received chat message ${data.chat.logId} in channel ${channel.channelId}`)
|
||||
this.log(`${data.chat.logId} received in channel ${channel.channelId}`)
|
||||
return this.write("chat", {
|
||||
//is_sequential: true, // TODO Make sequential per user & channel (if it isn't already)
|
||||
chatlog: data.chat,
|
||||
|
Loading…
Reference in New Issue
Block a user