Compare commits
No commits in common. "e94c598e3b55e32097b2ea37a05fe76eed8986bb" and "3ced968494cbbdbb6aae027e06f674d8e41d6726" have entirely different histories.
e94c598e3b
...
3ced968494
@ -57,7 +57,6 @@ class KakaoTalkBridge(Bridge):
|
|||||||
|
|
||||||
def prepare_bridge(self) -> None:
|
def prepare_bridge(self) -> None:
|
||||||
super().prepare_bridge()
|
super().prepare_bridge()
|
||||||
""" TODO Implement web login
|
|
||||||
if self.config["appservice.public.enabled"]:
|
if self.config["appservice.public.enabled"]:
|
||||||
secret = self.config["appservice.public.shared_secret"]
|
secret = self.config["appservice.public.shared_secret"]
|
||||||
self.public_website = PublicBridgeWebsite(loop=self.loop, shared_secret=secret)
|
self.public_website = PublicBridgeWebsite(loop=self.loop, shared_secret=secret)
|
||||||
@ -66,21 +65,21 @@ class KakaoTalkBridge(Bridge):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.public_website = None
|
self.public_website = None
|
||||||
"""
|
|
||||||
self.public_website = None
|
|
||||||
|
|
||||||
def prepare_stop(self) -> None:
|
def prepare_stop(self) -> None:
|
||||||
self.log.debug("Stopping RPC connection")
|
|
||||||
KakaoTalkClient.stop_cls()
|
|
||||||
self.log.debug("Stopping puppet syncers")
|
self.log.debug("Stopping puppet syncers")
|
||||||
for puppet in Puppet.by_custom_mxid.values():
|
for puppet in Puppet.by_custom_mxid.values():
|
||||||
puppet.stop()
|
puppet.stop()
|
||||||
self.log.debug("Stopping kakaotalk listeners")
|
self.log.debug("Stopping kakaotalk listeners")
|
||||||
User.shutdown = True
|
User.shutdown = True
|
||||||
self.add_shutdown_actions(user.save() for user in User.by_mxid.values())
|
self.add_shutdown_actions(user.save() for user in User.by_mxid.values())
|
||||||
|
self.add_shutdown_actions(KakaoTalkClient.stop_cls())
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
KakaoTalkClient.init_cls(self.config)
|
# Block all other startup actions until RPC is ready
|
||||||
|
# TODO Remove when/if node backend is replaced with native
|
||||||
|
await KakaoTalkClient.init_cls(self.config)
|
||||||
|
|
||||||
self.add_startup_actions(User.init_cls(self))
|
self.add_startup_actions(User.init_cls(self))
|
||||||
self.add_startup_actions(Puppet.init_cls(self))
|
self.add_startup_actions(Puppet.init_cls(self))
|
||||||
Portal.init_cls(self)
|
Portal.init_cls(self)
|
||||||
|
@ -23,6 +23,7 @@ from mautrix.types import UserID
|
|||||||
from mautrix.util.config import ConfigUpdateHelper, ForbiddenDefault, ForbiddenKey
|
from mautrix.util.config import ConfigUpdateHelper, ForbiddenDefault, ForbiddenKey
|
||||||
|
|
||||||
|
|
||||||
|
# TODO Remove unneeded configs!!
|
||||||
class Config(BaseBridgeConfig):
|
class Config(BaseBridgeConfig):
|
||||||
def __getitem__(self, key: str) -> Any:
|
def __getitem__(self, key: str) -> Any:
|
||||||
try:
|
try:
|
||||||
@ -93,15 +94,31 @@ class Config(BaseBridgeConfig):
|
|||||||
copy("bridge.backfill.initial_limit")
|
copy("bridge.backfill.initial_limit")
|
||||||
copy("bridge.backfill.missed_limit")
|
copy("bridge.backfill.missed_limit")
|
||||||
copy("bridge.backfill.disable_notifications")
|
copy("bridge.backfill.disable_notifications")
|
||||||
""" TODO
|
if "bridge.periodic_reconnect_interval" in self:
|
||||||
copy("bridge.periodic_reconnect.interval")
|
base["bridge.periodic_reconnect.interval"] = self["bridge.periodic_reconnect_interval"]
|
||||||
copy("bridge.periodic_reconnect.always")
|
base["bridge.periodic_reconnect.mode"] = self["bridge.periodic_reconnect_mode"]
|
||||||
copy("bridge.periodic_reconnect.min_connected_time")
|
else:
|
||||||
"""
|
copy("bridge.periodic_reconnect.interval")
|
||||||
|
copy("bridge.periodic_reconnect.mode")
|
||||||
|
copy("bridge.periodic_reconnect.always")
|
||||||
|
copy("bridge.periodic_reconnect.min_connected_time")
|
||||||
copy("bridge.resync_max_disconnected_time")
|
copy("bridge.resync_max_disconnected_time")
|
||||||
copy("bridge.sync_on_startup")
|
copy("bridge.sync_on_startup")
|
||||||
copy("bridge.temporary_disconnect_notices")
|
copy("bridge.temporary_disconnect_notices")
|
||||||
copy("bridge.disable_bridge_notices")
|
copy("bridge.disable_bridge_notices")
|
||||||
|
if "bridge.refresh_on_reconnection_fail" in self:
|
||||||
|
base["bridge.on_reconnection_fail.action"] = (
|
||||||
|
"refresh" if self["bridge.refresh_on_reconnection_fail"] else None
|
||||||
|
)
|
||||||
|
base["bridge.on_reconnection_fail.wait_for"] = 0
|
||||||
|
elif "bridge.on_reconnection_fail.refresh" in self:
|
||||||
|
base["bridge.on_reconnection_fail.action"] = (
|
||||||
|
"refresh" if self["bridge.on_reconnection_fail.refresh"] else None
|
||||||
|
)
|
||||||
|
copy("bridge.on_reconnection_fail.wait_for")
|
||||||
|
else:
|
||||||
|
copy("bridge.on_reconnection_fail.action")
|
||||||
|
copy("bridge.on_reconnection_fail.wait_for")
|
||||||
copy("bridge.resend_bridge_info")
|
copy("bridge.resend_bridge_info")
|
||||||
copy("bridge.mute_bridging")
|
copy("bridge.mute_bridging")
|
||||||
copy("bridge.tag_only_on_create")
|
copy("bridge.tag_only_on_create")
|
||||||
@ -109,14 +126,13 @@ class Config(BaseBridgeConfig):
|
|||||||
|
|
||||||
copy_dict("bridge.permissions")
|
copy_dict("bridge.permissions")
|
||||||
|
|
||||||
""" TODO
|
|
||||||
for key in (
|
for key in (
|
||||||
"bridge.periodic_reconnect.interval",
|
"bridge.periodic_reconnect.interval",
|
||||||
|
"bridge.on_reconnection_fail.wait_for",
|
||||||
):
|
):
|
||||||
value = base.get(key, None)
|
value = base.get(key, None)
|
||||||
if isinstance(value, list) and len(value) != 2:
|
if isinstance(value, list) and len(value) != 2:
|
||||||
raise ValueError(f"{key} must only be a list of two items")
|
raise ValueError(f"{key} must only be a list of two items")
|
||||||
"""
|
|
||||||
|
|
||||||
copy("rpc.connection.type")
|
copy("rpc.connection.type")
|
||||||
if base["rpc.connection.type"] == "unix":
|
if base["rpc.connection.type"] == "unix":
|
||||||
|
@ -50,7 +50,6 @@ appservice:
|
|||||||
max_size: 10
|
max_size: 10
|
||||||
|
|
||||||
# Public part of web server for out-of-Matrix interaction with the bridge.
|
# Public part of web server for out-of-Matrix interaction with the bridge.
|
||||||
# TODO Implement web login
|
|
||||||
public:
|
public:
|
||||||
# Whether or not the public-facing endpoints should be enabled.
|
# Whether or not the public-facing endpoints should be enabled.
|
||||||
enabled: false
|
enabled: false
|
||||||
@ -201,13 +200,15 @@ bridge:
|
|||||||
# If using double puppeting, should notifications be disabled
|
# If using double puppeting, should notifications be disabled
|
||||||
# while the initial backfill is in progress?
|
# while the initial backfill is in progress?
|
||||||
disable_notifications: false
|
disable_notifications: false
|
||||||
# TODO Implement this
|
# TODO Confirm this isn't needed
|
||||||
#periodic_reconnect:
|
#periodic_reconnect:
|
||||||
# # Interval in seconds in which to automatically reconnect all users.
|
# # Interval in seconds in which to automatically reconnect all users.
|
||||||
# # This may prevent KakaoTalk from "switching servers".
|
# # This can be used to automatically mitigate the bug where KakaoTalk stops sending messages.
|
||||||
# # Set to -1 to disable periodic reconnections entirely.
|
# # Set to -1 to disable periodic reconnections entirely.
|
||||||
# # Set to a list of two items to randomize the interval (min, max).
|
# # Set to a list of two items to randomize the interval (min, max).
|
||||||
# interval: -1
|
# interval: -1
|
||||||
|
# # What to do in periodic reconnects. Either "refresh" or "reconnect"
|
||||||
|
# mode: refresh
|
||||||
# # Should even disconnected users be reconnected?
|
# # Should even disconnected users be reconnected?
|
||||||
# always: false
|
# always: false
|
||||||
# # Only reconnect if the user has been connected for longer than this value
|
# # Only reconnect if the user has been connected for longer than this value
|
||||||
@ -215,7 +216,6 @@ bridge:
|
|||||||
# The number of seconds that a disconnection can last without triggering an automatic re-sync
|
# The number of seconds that a disconnection can last without triggering an automatic re-sync
|
||||||
# and missed message backfilling when reconnecting.
|
# and missed message backfilling when reconnecting.
|
||||||
# Set to 0 to always re-sync, or -1 to never re-sync automatically.
|
# Set to 0 to always re-sync, or -1 to never re-sync automatically.
|
||||||
# TODO Actually use this setting
|
|
||||||
resync_max_disconnected_time: 5
|
resync_max_disconnected_time: 5
|
||||||
# Should the bridge do a resync on startup?
|
# Should the bridge do a resync on startup?
|
||||||
sync_on_startup: true
|
sync_on_startup: true
|
||||||
|
@ -16,12 +16,11 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Match
|
from typing import Match
|
||||||
|
from html import escape
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from mautrix.types import Format, MessageType, TextMessageEventContent
|
from mautrix.types import Format, MessageType, TextMessageEventContent
|
||||||
|
|
||||||
from ..kt.types.chat.attachment.mention import MentionStruct
|
|
||||||
|
|
||||||
from .. import puppet as pu, user as u
|
from .. import puppet as pu, user as u
|
||||||
|
|
||||||
_START = r"^|\s"
|
_START = r"^|\s"
|
||||||
@ -80,38 +79,92 @@ def _handle_blockquote(output: list[str], blockquote: bool, line: str) -> tuple[
|
|||||||
return blockquote, line
|
return blockquote, line
|
||||||
|
|
||||||
|
|
||||||
async def kakaotalk_to_matrix(msg: str | None, mentions: list[MentionStruct] | None) -> TextMessageEventContent:
|
def _handle_codeblock_pre(
|
||||||
# TODO Shouts
|
output: list[str], codeblock: bool, line: str
|
||||||
text = msg or ""
|
) -> tuple[bool, str, tuple[str | None, str | None, str | None]]:
|
||||||
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)
|
cb = line.find("```")
|
||||||
|
cb_lang = None
|
||||||
if mentions:
|
cb_content = None
|
||||||
mention_user_ids = []
|
post_cb_content = None
|
||||||
at_chunks = text.split("@")
|
if cb != -1:
|
||||||
for m in mentions:
|
if not codeblock:
|
||||||
for idx in m.at:
|
cb_lang = line[cb + 3 :]
|
||||||
chunk = at_chunks[idx]
|
if "```" in cb_lang:
|
||||||
original = chunk[:m.len]
|
end = cb_lang.index("```")
|
||||||
mention_user_ids.append(int(m.user_id))
|
cb_content = cb_lang[:end]
|
||||||
at_chunks[idx] = f"{m.user_id}\u2063{original}\u2063{chunk[m.len:]}"
|
post_cb_content = cb_lang[end + 3 :]
|
||||||
text = "@".join(at_chunks)
|
cb_lang = ""
|
||||||
|
|
||||||
mention_user_map = {}
|
|
||||||
for ktid in mention_user_ids:
|
|
||||||
user = await u.User.get_by_ktid(ktid)
|
|
||||||
if user:
|
|
||||||
mention_user_map[ktid] = user.mxid
|
|
||||||
else:
|
else:
|
||||||
puppet = await pu.Puppet.get_by_ktid(ktid, create=False)
|
codeblock = True
|
||||||
mention_user_map[ktid] = puppet.mxid if puppet else None
|
line = line[:cb]
|
||||||
|
else:
|
||||||
|
output.append("</code></pre>")
|
||||||
|
codeblock = False
|
||||||
|
line = line[cb + 3 :]
|
||||||
|
return codeblock, line, (cb_lang, cb_content, post_cb_content)
|
||||||
|
|
||||||
if mention_user_map:
|
|
||||||
def _mention_replacer(match: Match) -> str:
|
|
||||||
mxid = mention_user_map[int(match.group(1))]
|
|
||||||
if not mxid:
|
|
||||||
return match.group(2)
|
|
||||||
return f'<a href="https://matrix.to/#/{mxid}">{match.group(2)}</a>'
|
|
||||||
|
|
||||||
content.format = Format.HTML
|
def _handle_codeblock_post(
|
||||||
content.formatted_body = MENTION_REGEX.sub(_mention_replacer, text).replace("\n", "<br/>\n")
|
output: list[str], cb_lang: str | None, cb_content: str | None, post_cb_content: str | None
|
||||||
|
) -> None:
|
||||||
|
if cb_lang is not None:
|
||||||
|
if cb_lang:
|
||||||
|
output.append(f'<pre><code class="language-{cb_lang}">')
|
||||||
|
else:
|
||||||
|
output.append("<pre><code>")
|
||||||
|
if cb_content:
|
||||||
|
output.append(cb_content)
|
||||||
|
output.append("</code></pre>")
|
||||||
|
output.append(_convert_formatting(post_cb_content))
|
||||||
|
|
||||||
|
|
||||||
|
async def kakaotalk_to_matrix(msg: str) -> TextMessageEventContent:
|
||||||
|
text = msg or ""
|
||||||
|
mentions = []
|
||||||
|
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)
|
||||||
|
mention_user_ids = []
|
||||||
|
for m in reversed(mentions):
|
||||||
|
original = text[m.offset : m.offset + m.length]
|
||||||
|
if len(original) > 0 and original[0] == "@":
|
||||||
|
original = original[1:]
|
||||||
|
mention_user_ids.append(int(m.user_id))
|
||||||
|
text = f"{text[:m.offset]}@{m.user_id}\u2063{original}\u2063{text[m.offset + m.length:]}"
|
||||||
|
html = escape(text)
|
||||||
|
output = []
|
||||||
|
if html:
|
||||||
|
codeblock = False
|
||||||
|
blockquote = False
|
||||||
|
line: str
|
||||||
|
lines = html.split("\n")
|
||||||
|
for i, line in enumerate(lines):
|
||||||
|
blockquote, line = _handle_blockquote(output, blockquote, line)
|
||||||
|
codeblock, line, post_args = _handle_codeblock_pre(output, codeblock, line)
|
||||||
|
output.append(_convert_formatting(line))
|
||||||
|
if i != len(lines) - 1:
|
||||||
|
if codeblock:
|
||||||
|
output.append("\n")
|
||||||
|
else:
|
||||||
|
output.append("<br/>")
|
||||||
|
_handle_codeblock_post(output, *post_args)
|
||||||
|
html = "".join(output)
|
||||||
|
|
||||||
|
mention_user_map = {}
|
||||||
|
for ktid in mention_user_ids:
|
||||||
|
user = await u.User.get_by_ktid(ktid)
|
||||||
|
if user:
|
||||||
|
mention_user_map[ktid] = user.mxid
|
||||||
|
else:
|
||||||
|
puppet = await pu.Puppet.get_by_ktid(ktid, create=False)
|
||||||
|
mention_user_map[ktid] = puppet.mxid if puppet else None
|
||||||
|
|
||||||
|
def _mention_replacer(match: Match) -> str:
|
||||||
|
mxid = mention_user_map[int(match.group(1))]
|
||||||
|
if not mxid:
|
||||||
|
return match.group(2)
|
||||||
|
return f'<a href="https://matrix.to/#/{mxid}">{match.group(2)}</a>'
|
||||||
|
|
||||||
|
html = MENTION_REGEX.sub(_mention_replacer, html)
|
||||||
|
if html != escape(content.body).replace("\n", "<br/>\n"):
|
||||||
|
content.format = Format.HTML
|
||||||
|
content.formatted_body = html
|
||||||
return content
|
return content
|
||||||
|
@ -17,35 +17,29 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from typing import NamedTuple
|
from typing import NamedTuple
|
||||||
|
|
||||||
from mautrix.appservice import IntentAPI
|
from mautrix.types import Format, MessageEventContent, RelationType, RoomID
|
||||||
from mautrix.types import Format, MessageEventContent, RelationType, RoomID, UserID
|
|
||||||
from mautrix.util.formatter import (
|
from mautrix.util.formatter import (
|
||||||
EntityString,
|
EntityString,
|
||||||
EntityType,
|
EntityType,
|
||||||
MarkdownString,
|
MarkdownString,
|
||||||
MatrixParser,
|
MatrixParser as BaseMatrixParser,
|
||||||
SimpleEntity,
|
SimpleEntity,
|
||||||
)
|
)
|
||||||
from mautrix.util.logging import TraceLogger
|
from mautrix.util.logging import TraceLogger
|
||||||
|
|
||||||
from ..kt.types.bson import Long
|
|
||||||
from ..kt.types.chat import KnownChatType
|
|
||||||
from ..kt.types.chat.attachment import ReplyAttachment, MentionStruct
|
|
||||||
|
|
||||||
from ..kt.client.types import TO_MSGTYPE_MAP
|
|
||||||
|
|
||||||
from .. import puppet as pu, user as u
|
from .. import puppet as pu, user as u
|
||||||
from ..db import Message as DBMessage
|
from ..db import Message as DBMessage
|
||||||
|
|
||||||
|
|
||||||
class SendParams(NamedTuple):
|
class SendParams(NamedTuple):
|
||||||
text: str
|
text: str
|
||||||
mentions: list[MentionStruct] | None
|
mentions: list[None]
|
||||||
reply_to: ReplyAttachment
|
reply_to: str
|
||||||
|
|
||||||
|
|
||||||
class KakaoTalkFormatString(EntityString[SimpleEntity, EntityType], MarkdownString):
|
class FacebookFormatString(EntityString[SimpleEntity, EntityType], MarkdownString):
|
||||||
def format(self, entity_type: EntityType, **kwargs) -> KakaoTalkFormatString:
|
def format(self, entity_type: EntityType, **kwargs) -> FacebookFormatString:
|
||||||
|
prefix = suffix = ""
|
||||||
if entity_type == EntityType.USER_MENTION:
|
if entity_type == EntityType.USER_MENTION:
|
||||||
self.entities.append(
|
self.entities.append(
|
||||||
SimpleEntity(
|
SimpleEntity(
|
||||||
@ -55,110 +49,72 @@ class KakaoTalkFormatString(EntityString[SimpleEntity, EntityType], MarkdownStri
|
|||||||
extra_info={"user_id": kwargs["user_id"]},
|
extra_info={"user_id": kwargs["user_id"]},
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.text = f"@{self.text}"
|
return self
|
||||||
|
elif entity_type == EntityType.BOLD:
|
||||||
|
prefix = suffix = "*"
|
||||||
|
elif entity_type == EntityType.ITALIC:
|
||||||
|
prefix = suffix = "_"
|
||||||
|
elif entity_type == EntityType.STRIKETHROUGH:
|
||||||
|
prefix = suffix = "~"
|
||||||
|
elif entity_type == EntityType.URL:
|
||||||
|
if kwargs["url"] != self.text:
|
||||||
|
suffix = f" ({kwargs['url']})"
|
||||||
|
elif entity_type == EntityType.PREFORMATTED:
|
||||||
|
prefix = f"```{kwargs['language']}\n"
|
||||||
|
suffix = "\n```"
|
||||||
|
elif entity_type == EntityType.INLINE_CODE:
|
||||||
|
prefix = suffix = "`"
|
||||||
|
elif entity_type == EntityType.BLOCKQUOTE:
|
||||||
|
children = self.trim().split("\n")
|
||||||
|
children = [child.prepend("> ") for child in children]
|
||||||
|
return self.join(children, "\n")
|
||||||
|
elif entity_type == EntityType.HEADER:
|
||||||
|
prefix = "#" * kwargs["size"] + " "
|
||||||
|
else:
|
||||||
|
return self
|
||||||
|
|
||||||
|
self._offset_entities(len(prefix))
|
||||||
|
self.text = f"{prefix}{self.text}{suffix}"
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class ToKakaoTalkParser(MatrixParser[KakaoTalkFormatString]):
|
class MatrixParser(BaseMatrixParser[FacebookFormatString]):
|
||||||
fs = KakaoTalkFormatString
|
fs = FacebookFormatString
|
||||||
|
|
||||||
|
|
||||||
async def _get_id_from_mxid(mxid: UserID) -> Long | None:
|
|
||||||
user = await u.User.get_by_mxid(mxid, create=False)
|
|
||||||
if user and user.ktid:
|
|
||||||
return user.ktid
|
|
||||||
else:
|
|
||||||
puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
|
|
||||||
return puppet.ktid if puppet else None
|
|
||||||
|
|
||||||
|
|
||||||
async def matrix_to_kakaotalk(
|
async def matrix_to_kakaotalk(
|
||||||
content: MessageEventContent,
|
content: MessageEventContent, room_id: RoomID, log: TraceLogger
|
||||||
room_id: RoomID,
|
|
||||||
log: TraceLogger,
|
|
||||||
intent: IntentAPI,
|
|
||||||
skip_reply: bool = False
|
|
||||||
) -> SendParams:
|
) -> SendParams:
|
||||||
# NOTE By design, this *throws* if user intent can't be matched (i.e. if a reply can't be created)
|
mentions = []
|
||||||
if content.relates_to.rel_type == RelationType.REPLY and not skip_reply:
|
reply_to = None
|
||||||
|
if content.relates_to.rel_type == RelationType.REPLY:
|
||||||
message = await DBMessage.get_by_mxid(content.relates_to.event_id, room_id)
|
message = await DBMessage.get_by_mxid(content.relates_to.event_id, room_id)
|
||||||
if not message:
|
if message:
|
||||||
raise ValueError(
|
content.trim_reply_fallback()
|
||||||
f"Couldn't find reply target {content.relates_to.event_id}"
|
reply_to = message.ktid
|
||||||
" to bridge text message reply metadata to KakaoTalk"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
src_event = await intent.get_event(room_id, message.mxid)
|
|
||||||
except:
|
|
||||||
log.exception(f"Failed to find Matrix event for reply target {message.mxid}")
|
|
||||||
raise
|
|
||||||
src_kt_sender = await _get_id_from_mxid(src_event.sender)
|
|
||||||
if src_kt_sender is None:
|
|
||||||
raise ValueError(
|
|
||||||
f"Found no KakaoTalk user ID for reply target sender {src_event.sender}"
|
|
||||||
)
|
|
||||||
content.trim_reply_fallback()
|
|
||||||
src_converted = await matrix_to_kakaotalk(src_event.content, room_id, log, intent, skip_reply=True)
|
|
||||||
if src_event.content.relates_to.rel_type == RelationType.REPLY:
|
|
||||||
src_type = KnownChatType.REPLY
|
|
||||||
src_message = src_converted.text
|
|
||||||
else:
|
else:
|
||||||
src_type = TO_MSGTYPE_MAP[src_event.content.msgtype]
|
log.warning(
|
||||||
if src_type == KnownChatType.FILE:
|
f"Couldn't find reply target {content.relates_to.event_id}"
|
||||||
src_message = _media_type_reply_body_map[KnownChatType.FILE] + src_converted.text
|
" to bridge text message reply metadata to Facebook"
|
||||||
else:
|
)
|
||||||
src_message = _media_type_reply_body_map.get(src_type, src_converted.text)
|
if content.get("format", None) == Format.HTML and content["formatted_body"]:
|
||||||
reply_to = ReplyAttachment(
|
parsed = await MatrixParser().parse(content["formatted_body"])
|
||||||
# NOTE mentions will be merged into this later
|
|
||||||
# TODO Set this for emoticon reply, but must first support them
|
|
||||||
attach_only=False,
|
|
||||||
# TODO If replying with media works, must set type AND all attachment properties
|
|
||||||
# But then, the reply object must be an intersection of a ReplyAttachment and something else
|
|
||||||
#attach_type=TO_MSGTYPE_MAP.get(content.msgtype),
|
|
||||||
# TODO Confirm why official client sets this to 0, and whether this should be left as None instead
|
|
||||||
attach_type=0,
|
|
||||||
src_logId=message.ktid,
|
|
||||||
src_mentions=src_converted.mentions or [],
|
|
||||||
src_message=src_message,
|
|
||||||
src_type=src_type,
|
|
||||||
src_userId=src_kt_sender,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
reply_to = None
|
|
||||||
if content.get("format", None) == Format.HTML and content["formatted_body"] and content.msgtype.is_text:
|
|
||||||
parsed = await ToKakaoTalkParser().parse(content["formatted_body"])
|
|
||||||
text = parsed.text
|
text = parsed.text
|
||||||
mentions_by_user: dict[Long, MentionStruct] = {}
|
mentions = []
|
||||||
# Make sure to not create remote mentions for any remote user not in the room
|
for mention in parsed.entities:
|
||||||
if parsed.entities:
|
mxid = mention.extra_info["user_id"]
|
||||||
joined_members = set(await intent.get_room_members(room_id))
|
user = await u.User.get_by_mxid(mxid, create=False)
|
||||||
last_offset = 0
|
if user and user.ktid:
|
||||||
at = 0
|
ktid = user.ktid
|
||||||
for mention in sorted(parsed.entities, key=lambda entity: entity.offset):
|
else:
|
||||||
mxid = mention.extra_info["user_id"]
|
puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
|
||||||
if mxid not in joined_members:
|
if puppet:
|
||||||
|
ktid = puppet.ktid
|
||||||
|
else:
|
||||||
continue
|
continue
|
||||||
ktid = await _get_id_from_mxid(mxid)
|
#mentions.append(
|
||||||
if ktid is None:
|
# Mention(user_id=str(ktid), offset=mention.offset, length=mention.length)
|
||||||
continue
|
#)
|
||||||
at += text[last_offset:mention.offset+1].count("@")
|
|
||||||
last_offset = mention.offset+1
|
|
||||||
mention = mentions_by_user.setdefault(ktid, MentionStruct(
|
|
||||||
at=[],
|
|
||||||
len=mention.length,
|
|
||||||
user_id=ktid,
|
|
||||||
))
|
|
||||||
mention.at.append(at)
|
|
||||||
mentions = list(mentions_by_user.values()) if mentions_by_user else None
|
|
||||||
else:
|
else:
|
||||||
text = content.body
|
text = content.body
|
||||||
mentions = None
|
|
||||||
return SendParams(text=text, mentions=mentions, reply_to=reply_to)
|
return SendParams(text=text, mentions=mentions, reply_to=reply_to)
|
||||||
|
|
||||||
|
|
||||||
_media_type_reply_body_map: dict[KnownChatType, str] = {
|
|
||||||
KnownChatType.PHOTO: "Photo",
|
|
||||||
KnownChatType.VIDEO: "Video",
|
|
||||||
KnownChatType.AUDIO: "Voice Note",
|
|
||||||
KnownChatType.FILE: "File: ",
|
|
||||||
}
|
|
||||||
|
@ -22,8 +22,7 @@ with any other potential backend.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING, cast, ClassVar, Type, Optional, Union
|
from typing import TYPE_CHECKING, cast, Type, Optional, Union
|
||||||
import asyncio
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
import logging
|
import logging
|
||||||
import urllib.request
|
import urllib.request
|
||||||
@ -42,7 +41,6 @@ from ..types.api.struct import FriendListStruct
|
|||||||
from ..types.bson import Long
|
from ..types.bson import Long
|
||||||
from ..types.client.client_session import LoginResult
|
from ..types.client.client_session import LoginResult
|
||||||
from ..types.chat import Chatlog, KnownChatType
|
from ..types.chat import Chatlog, KnownChatType
|
||||||
from ..types.chat.attachment import MentionStruct, ReplyAttachment
|
|
||||||
from ..types.oauth import OAuthCredential, OAuthInfo
|
from ..types.oauth import OAuthCredential, OAuthInfo
|
||||||
from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes
|
from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes
|
||||||
from ..types.request import (
|
from ..types.request import (
|
||||||
@ -65,7 +63,7 @@ except ImportError:
|
|||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from mautrix.types import JSON
|
from mautrix.types import JSON
|
||||||
from ... import user as u
|
from ...user import User
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
@ -80,22 +78,15 @@ class Client:
|
|||||||
_rpc_client: RPCClient
|
_rpc_client: RPCClient
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def init_cls(cls, config: Config) -> None:
|
async def init_cls(cls, config: Config) -> None:
|
||||||
"""Initialize RPC to the Node backend."""
|
"""Initialize RPC to the Node backend."""
|
||||||
cls._rpc_client = RPCClient(config)
|
cls._rpc_client = RPCClient(config)
|
||||||
# NOTE No need to store this, as cancelling the RPCClient will cancel this too
|
await cls._rpc_client.connect()
|
||||||
asyncio.create_task(cls._keep_connected())
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def _keep_connected(cls) -> None:
|
async def stop_cls(cls) -> None:
|
||||||
while True:
|
|
||||||
await cls._rpc_client.connect()
|
|
||||||
await cls._rpc_client.wait_for_disconnection()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def stop_cls(cls) -> None:
|
|
||||||
"""Stop and disconnect from the Node backend."""
|
"""Stop and disconnect from the Node backend."""
|
||||||
cls._rpc_client.cancel()
|
await cls._rpc_client.disconnect()
|
||||||
|
|
||||||
|
|
||||||
# region tokenless commands
|
# region tokenless commands
|
||||||
@ -132,15 +123,12 @@ class Client:
|
|||||||
# endregion
|
# endregion
|
||||||
|
|
||||||
|
|
||||||
user: u.User
|
|
||||||
_rpc_disconnection_task: asyncio.Task | None
|
|
||||||
http: ClientSession
|
http: ClientSession
|
||||||
log: TraceLogger
|
log: TraceLogger
|
||||||
|
|
||||||
def __init__(self, user: u.User, log: Optional[TraceLogger] = None):
|
def __init__(self, user: User, log: Optional[TraceLogger] = None):
|
||||||
"""Create a per-user client object for user-specific client functionality."""
|
"""Create a per-user client object for user-specific client functionality."""
|
||||||
self.user = user
|
self.user = user
|
||||||
self._rpc_disconnection_task = None
|
|
||||||
|
|
||||||
# TODO Let the Node backend use a proxy too!
|
# TODO Let the Node backend use a proxy too!
|
||||||
connector = None
|
connector = None
|
||||||
@ -199,27 +187,13 @@ class Client:
|
|||||||
Receive the user's profile info in response.
|
Receive the user's profile info in response.
|
||||||
"""
|
"""
|
||||||
profile_req_struct = await self._api_user_request_result(ProfileReqStruct, "start")
|
profile_req_struct = await self._api_user_request_result(ProfileReqStruct, "start")
|
||||||
if not self._rpc_disconnection_task:
|
|
||||||
self._rpc_disconnection_task = asyncio.create_task(self._rpc_disconnection_handler())
|
|
||||||
else:
|
|
||||||
self.log.warning("Called \"start\" on an already-started client")
|
|
||||||
return profile_req_struct.profile
|
return profile_req_struct.profile
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Immediately stop bridging this user."""
|
"""Immediately stop bridging this user."""
|
||||||
self._stop_listen()
|
self._stop_listen()
|
||||||
if self._rpc_disconnection_task:
|
|
||||||
self._rpc_disconnection_task.cancel()
|
|
||||||
else:
|
|
||||||
self.log.warning("Called \"stop\" on an already-stopped client")
|
|
||||||
await self._rpc_client.request("stop", mxid=self.user.mxid)
|
await self._rpc_client.request("stop", mxid=self.user.mxid)
|
||||||
|
|
||||||
async def _rpc_disconnection_handler(self) -> None:
|
|
||||||
await self._rpc_client.wait_for_disconnection()
|
|
||||||
self._rpc_disconnection_task = None
|
|
||||||
self._stop_listen()
|
|
||||||
asyncio.create_task(self.user.on_client_disconnect())
|
|
||||||
|
|
||||||
async def renew_and_save(self) -> None:
|
async def renew_and_save(self) -> None:
|
||||||
"""Renew and save the user's session tokens."""
|
"""Renew and save the user's session tokens."""
|
||||||
oauth_info = await self._api_request_result(OAuthInfo, "renew", oauth_credential=self._oauth_credential)
|
oauth_info = await self._api_request_result(OAuthInfo, "renew", oauth_credential=self._oauth_credential)
|
||||||
@ -299,20 +273,12 @@ class Client:
|
|||||||
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
|
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def send_message(
|
async def send_message(self, channel_props: ChannelProps, text: str) -> Chatlog:
|
||||||
self,
|
|
||||||
channel_props: ChannelProps,
|
|
||||||
text: str,
|
|
||||||
reply_to: ReplyAttachment | None,
|
|
||||||
mentions: list[MentionStruct] | None,
|
|
||||||
) -> Chatlog:
|
|
||||||
return await self._api_user_request_result(
|
return await self._api_user_request_result(
|
||||||
Chatlog,
|
Chatlog,
|
||||||
"send_chat",
|
"send_chat",
|
||||||
channel_props=channel_props.serialize(),
|
channel_props=channel_props.serialize(),
|
||||||
text=text,
|
text=text,
|
||||||
reply_to=reply_to.serialize() if reply_to is not None else None,
|
|
||||||
mentions=[m.serialize() for m in mentions] if mentions is not None else None,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def send_media(
|
async def send_media(
|
||||||
|
@ -25,11 +25,11 @@ from .mention import MentionStruct
|
|||||||
|
|
||||||
@dataclass(kw_only=True)
|
@dataclass(kw_only=True)
|
||||||
class ReplyAttachment(Attachment):
|
class ReplyAttachment(Attachment):
|
||||||
attach_only: bool = None # NOTE Made optional
|
attach_only: bool
|
||||||
attach_type: Optional[ChatType] = None # NOTE Changed from int for outgoing typeless replies
|
attach_type: int
|
||||||
src_linkId: Optional[Long] = None
|
src_linkId: Optional[Long] = None
|
||||||
src_logId: Long
|
src_logId: Long
|
||||||
src_mentions: list[MentionStruct] = None # NOTE Made optional
|
src_mentions: list[MentionStruct]
|
||||||
src_message: str
|
src_message: str
|
||||||
src_type: ChatType
|
src_type: ChatType
|
||||||
src_userId: Long
|
src_userId: Long
|
||||||
|
@ -94,12 +94,15 @@ ResultType = TypeVar("ResultType", bound=Serializable)
|
|||||||
|
|
||||||
def ResultListType(result_type: Type[ResultType]):
|
def ResultListType(result_type: Type[ResultType]):
|
||||||
class _ResultListType(list[result_type], Serializable):
|
class _ResultListType(list[result_type], Serializable):
|
||||||
|
def __init__(self, iterable: Iterable[result_type]=()):
|
||||||
|
list.__init__(self, (result_type.deserialize(x) for x in iterable))
|
||||||
|
|
||||||
def serialize(self) -> list[JSON]:
|
def serialize(self) -> list[JSON]:
|
||||||
return [v.serialize() for v in self]
|
return [v.serialize() for v in self]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def deserialize(cls, data: list[JSON]) -> "_ResultListType":
|
def deserialize(cls, data: list[JSON]) -> "_ResultListType":
|
||||||
return [result_type.deserialize(item) for item in data]
|
return cls(data)
|
||||||
|
|
||||||
return _ResultListType
|
return _ResultListType
|
||||||
|
|
||||||
|
@ -46,7 +46,6 @@ from mautrix.types import (
|
|||||||
Membership,
|
Membership,
|
||||||
MessageEventContent,
|
MessageEventContent,
|
||||||
MessageType,
|
MessageType,
|
||||||
RelationType,
|
|
||||||
RoomID,
|
RoomID,
|
||||||
TextMessageEventContent,
|
TextMessageEventContent,
|
||||||
UserID,
|
UserID,
|
||||||
@ -69,7 +68,6 @@ from .kt.types.channel.channel_info import ChannelInfo
|
|||||||
from .kt.types.channel.channel_type import KnownChannelType, ChannelType
|
from .kt.types.channel.channel_type import KnownChannelType, ChannelType
|
||||||
from .kt.types.chat import Chatlog, ChatType, KnownChatType
|
from .kt.types.chat import Chatlog, ChatType, KnownChatType
|
||||||
from .kt.types.chat.attachment import (
|
from .kt.types.chat.attachment import (
|
||||||
Attachment,
|
|
||||||
AudioAttachment,
|
AudioAttachment,
|
||||||
#FileAttachment,
|
#FileAttachment,
|
||||||
MediaAttachment,
|
MediaAttachment,
|
||||||
@ -772,8 +770,6 @@ class Portal(DBPortal, BasePortal):
|
|||||||
) -> None:
|
) -> None:
|
||||||
if message.get_edit():
|
if message.get_edit():
|
||||||
raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.")
|
raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.")
|
||||||
if message.relates_to.rel_type == RelationType.REPLY and not message.msgtype.is_text:
|
|
||||||
raise NotImplementedError("Replying with non-text content is not supported by the KakaoTalk bridge.")
|
|
||||||
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
|
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
|
||||||
if not sender:
|
if not sender:
|
||||||
raise Exception("not logged in")
|
raise Exception("not logged in")
|
||||||
@ -806,13 +802,14 @@ class Portal(DBPortal, BasePortal):
|
|||||||
async def _handle_matrix_text(
|
async def _handle_matrix_text(
|
||||||
self, event_id: EventID, sender: u.User, message: TextMessageEventContent
|
self, event_id: EventID, sender: u.User, message: TextMessageEventContent
|
||||||
) -> None:
|
) -> None:
|
||||||
converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self.main_intent)
|
converted = await matrix_to_kakaotalk(message, self.mxid, self.log)
|
||||||
try:
|
try:
|
||||||
chatlog = await sender.client.send_message(
|
chatlog = await sender.client.send_message(
|
||||||
self.channel_props,
|
self.channel_props,
|
||||||
text=converted.text,
|
text=converted.text,
|
||||||
reply_to=converted.reply_to,
|
# TODO
|
||||||
mentions=converted.mentions,
|
#mentions=converted.mentions,
|
||||||
|
#reply_to=converted.reply_to,
|
||||||
)
|
)
|
||||||
except CommandException as e:
|
except CommandException as e:
|
||||||
self.log.debug(f"Error handling Matrix message {event_id}: {e!s}")
|
self.log.debug(f"Error handling Matrix message {event_id}: {e!s}")
|
||||||
@ -840,6 +837,18 @@ class Portal(DBPortal, BasePortal):
|
|||||||
else:
|
else:
|
||||||
raise NotImplementedError("No file or URL specified")
|
raise NotImplementedError("No file or URL specified")
|
||||||
mimetype = message.info.mimetype or magic.mimetype(data)
|
mimetype = message.info.mimetype or magic.mimetype(data)
|
||||||
|
""" TODO Replies
|
||||||
|
reply_to = None
|
||||||
|
if message.relates_to.rel_type == RelationType.REPLY:
|
||||||
|
reply_to_msg = await DBMessage.get_by_mxid(message.relates_to.event_id, self.mxid)
|
||||||
|
if reply_to_msg:
|
||||||
|
reply_to = reply_to_msg.ktid
|
||||||
|
else:
|
||||||
|
self.log.warning(
|
||||||
|
f"Couldn't find reply target {message.relates_to.event_id}"
|
||||||
|
" to bridge media message reply metadata to KakaoTalk"
|
||||||
|
)
|
||||||
|
"""
|
||||||
filename = message.body
|
filename = message.body
|
||||||
width, height = None, None
|
width, height = None, None
|
||||||
if message.info in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO):
|
if message.info in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO):
|
||||||
@ -854,6 +863,8 @@ class Portal(DBPortal, BasePortal):
|
|||||||
width=width,
|
width=width,
|
||||||
height=height,
|
height=height,
|
||||||
ext=guess_extension(mimetype)[1:],
|
ext=guess_extension(mimetype)[1:],
|
||||||
|
# TODO
|
||||||
|
#reply_to=reply_to,
|
||||||
)
|
)
|
||||||
except CommandException as e:
|
except CommandException as e:
|
||||||
self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}")
|
self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}")
|
||||||
@ -1082,12 +1093,12 @@ class Portal(DBPortal, BasePortal):
|
|||||||
async def _handle_remote_text(
|
async def _handle_remote_text(
|
||||||
self,
|
self,
|
||||||
intent: IntentAPI,
|
intent: IntentAPI,
|
||||||
attachment: Attachment | None,
|
|
||||||
timestamp: int,
|
timestamp: int,
|
||||||
message_text: str | None,
|
message_text: str | None,
|
||||||
**_
|
**_
|
||||||
) -> list[EventID]:
|
) -> list[EventID]:
|
||||||
content = await kakaotalk_to_matrix(message_text, attachment.mentions if attachment else None)
|
# TODO Handle mentions properly
|
||||||
|
content = await kakaotalk_to_matrix(message_text)
|
||||||
return [await self._send_message(intent, content, timestamp=timestamp)]
|
return [await self._send_message(intent, content, timestamp=timestamp)]
|
||||||
|
|
||||||
async def _handle_remote_reply(
|
async def _handle_remote_reply(
|
||||||
@ -1098,7 +1109,7 @@ class Portal(DBPortal, BasePortal):
|
|||||||
message_text: str,
|
message_text: str,
|
||||||
**_
|
**_
|
||||||
) -> list[EventID]:
|
) -> list[EventID]:
|
||||||
content = await kakaotalk_to_matrix(message_text, attachment.mentions)
|
content = await kakaotalk_to_matrix(message_text)
|
||||||
await self._add_remote_reply(content, attachment)
|
await self._add_remote_reply(content, attachment)
|
||||||
return [await self._send_message(intent, content, timestamp=timestamp)]
|
return [await self._send_message(intent, content, timestamp=timestamp)]
|
||||||
|
|
||||||
|
@ -29,46 +29,6 @@ from .types import RPCError
|
|||||||
EventHandler = Callable[[dict[str, Any]], Awaitable[None]]
|
EventHandler = Callable[[dict[str, Any]], Awaitable[None]]
|
||||||
|
|
||||||
|
|
||||||
class CancelableEvent:
|
|
||||||
_event: asyncio.Event
|
|
||||||
_task: asyncio.Task | None
|
|
||||||
_cancelled: bool
|
|
||||||
_loop: asyncio.AbstractEventLoop
|
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop | None):
|
|
||||||
self._event = asyncio.Event()
|
|
||||||
self._task = None
|
|
||||||
self._cancelled = False
|
|
||||||
self._loop = loop or asyncio.get_running_loop()
|
|
||||||
|
|
||||||
def is_set(self) -> bool:
|
|
||||||
return self._event.is_set()
|
|
||||||
|
|
||||||
def set(self) -> None:
|
|
||||||
self._event.set()
|
|
||||||
self._task = None
|
|
||||||
|
|
||||||
def clear(self) -> None:
|
|
||||||
self._event.clear()
|
|
||||||
|
|
||||||
async def wait(self) -> None:
|
|
||||||
if self._cancelled:
|
|
||||||
raise asyncio.CancelledError()
|
|
||||||
if self._event.is_set():
|
|
||||||
return
|
|
||||||
if not self._task:
|
|
||||||
self._task = asyncio.create_task(self._event.wait())
|
|
||||||
await self._task
|
|
||||||
|
|
||||||
def cancel(self) -> None:
|
|
||||||
self._cancelled = True
|
|
||||||
if self._task is not None:
|
|
||||||
self._task.cancel()
|
|
||||||
|
|
||||||
def cancelled(self) -> bool:
|
|
||||||
return self._cancelled
|
|
||||||
|
|
||||||
|
|
||||||
class RPCClient:
|
class RPCClient:
|
||||||
config: Config
|
config: Config
|
||||||
loop: asyncio.AbstractEventLoop
|
loop: asyncio.AbstractEventLoop
|
||||||
@ -81,11 +41,6 @@ class RPCClient:
|
|||||||
_response_waiters: dict[int, asyncio.Future[JSON]]
|
_response_waiters: dict[int, asyncio.Future[JSON]]
|
||||||
_event_handlers: dict[str, list[EventHandler]]
|
_event_handlers: dict[str, list[EventHandler]]
|
||||||
_command_queue: asyncio.Queue
|
_command_queue: asyncio.Queue
|
||||||
_read_task: asyncio.Task | None
|
|
||||||
_connection_task: asyncio.Task | None
|
|
||||||
_is_connected: CancelableEvent
|
|
||||||
_is_disconnected: CancelableEvent
|
|
||||||
_connection_lock: asyncio.Lock
|
|
||||||
|
|
||||||
def __init__(self, config: Config) -> None:
|
def __init__(self, config: Config) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
@ -97,34 +52,16 @@ class RPCClient:
|
|||||||
self._writer = None
|
self._writer = None
|
||||||
self._reader = None
|
self._reader = None
|
||||||
self._command_queue = asyncio.Queue()
|
self._command_queue = asyncio.Queue()
|
||||||
self.loop.create_task(self._command_loop())
|
|
||||||
self._read_task = None
|
|
||||||
self._connection_task = None
|
|
||||||
self._is_connected = CancelableEvent(self.loop)
|
|
||||||
self._is_disconnected = CancelableEvent(self.loop)
|
|
||||||
self._is_disconnected.set()
|
|
||||||
self._connection_lock = asyncio.Lock()
|
|
||||||
|
|
||||||
async def connect(self) -> None:
|
async def connect(self) -> None:
|
||||||
async with self._connection_lock:
|
if self._writer is not None:
|
||||||
if self._is_connected.cancelled():
|
return
|
||||||
raise asyncio.CancelledError()
|
|
||||||
if self._is_connected.is_set():
|
|
||||||
return
|
|
||||||
self._connection_task = self.loop.create_task(self._connect())
|
|
||||||
try:
|
|
||||||
await self._connection_task
|
|
||||||
finally:
|
|
||||||
self._connection_task = None
|
|
||||||
|
|
||||||
async def _connect(self) -> None:
|
|
||||||
if self.config["rpc.connection.type"] == "unix":
|
if self.config["rpc.connection.type"] == "unix":
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
r, w = await asyncio.open_unix_connection(self.config["rpc.connection.path"])
|
r, w = await asyncio.open_unix_connection(self.config["rpc.connection.path"])
|
||||||
break
|
break
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except:
|
except:
|
||||||
self.log.warning(f'No unix socket available at {self.config["rpc.connection.path"]}, wait for it to exist...')
|
self.log.warning(f'No unix socket available at {self.config["rpc.connection.path"]}, wait for it to exist...')
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
@ -134,8 +71,6 @@ class RPCClient:
|
|||||||
r, w = await asyncio.open_connection(self.config["rpc.connection.host"],
|
r, w = await asyncio.open_connection(self.config["rpc.connection.host"],
|
||||||
self.config["rpc.connection.port"])
|
self.config["rpc.connection.port"])
|
||||||
break
|
break
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except:
|
except:
|
||||||
self.log.warning(f'No TCP connection open at {self.config["rpc.connection.host"]}:{self.config["rpc.connection.path"]}, wait for it to become available...')
|
self.log.warning(f'No TCP connection open at {self.config["rpc.connection.host"]}:{self.config["rpc.connection.path"]}, wait for it to become available...')
|
||||||
await asyncio.sleep(10)
|
await asyncio.sleep(10)
|
||||||
@ -143,46 +78,16 @@ class RPCClient:
|
|||||||
raise RuntimeError("invalid rpc connection type")
|
raise RuntimeError("invalid rpc connection type")
|
||||||
self._reader = r
|
self._reader = r
|
||||||
self._writer = w
|
self._writer = w
|
||||||
self._read_task = self.loop.create_task(self._try_read_loop())
|
self.loop.create_task(self._try_read_loop())
|
||||||
self._is_connected.set()
|
self.loop.create_task(self._command_loop())
|
||||||
self._is_disconnected.clear()
|
|
||||||
await self.request("register", peer_id=self.config["appservice.address"])
|
await self.request("register", peer_id=self.config["appservice.address"])
|
||||||
|
|
||||||
async def disconnect(self) -> None:
|
async def disconnect(self) -> None:
|
||||||
async with self._connection_lock:
|
|
||||||
if self._is_disconnected.cancelled():
|
|
||||||
raise asyncio.CancelledError()
|
|
||||||
if self._is_disconnected.is_set():
|
|
||||||
return
|
|
||||||
await self._disconnect()
|
|
||||||
|
|
||||||
async def _disconnect(self) -> None:
|
|
||||||
if self._writer is not None:
|
if self._writer is not None:
|
||||||
self._writer.write_eof()
|
self._writer.write_eof()
|
||||||
await self._writer.drain()
|
await self._writer.drain()
|
||||||
if self._read_task is not None:
|
self._writer = None
|
||||||
self._read_task.cancel()
|
self._reader = None
|
||||||
self._read_task = None
|
|
||||||
self._on_disconnect()
|
|
||||||
|
|
||||||
def _on_disconnect(self) -> None:
|
|
||||||
self._reader = None
|
|
||||||
self._writer = None
|
|
||||||
self._is_connected.clear()
|
|
||||||
self._is_disconnected.set()
|
|
||||||
|
|
||||||
def wait_for_connection(self) -> Awaitable[None]:
|
|
||||||
return self._is_connected.wait()
|
|
||||||
|
|
||||||
def wait_for_disconnection(self) -> Awaitable[None]:
|
|
||||||
return self._is_disconnected.wait()
|
|
||||||
|
|
||||||
def cancel(self) -> None:
|
|
||||||
self._is_connected.cancel()
|
|
||||||
self._is_disconnected.cancel()
|
|
||||||
if self._connection_task is not None:
|
|
||||||
self._connection_task.cancel()
|
|
||||||
asyncio.run(self._disconnect())
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _next_req_id(self) -> int:
|
def _next_req_id(self) -> int:
|
||||||
@ -214,7 +119,7 @@ class RPCClient:
|
|||||||
for handler in handlers:
|
for handler in handlers:
|
||||||
try:
|
try:
|
||||||
await handler(req)
|
await handler(req)
|
||||||
except:
|
except Exception:
|
||||||
self.log.exception("Exception in event handler")
|
self.log.exception("Exception in event handler")
|
||||||
|
|
||||||
async def _handle_incoming_line(self, line: str) -> None:
|
async def _handle_incoming_line(self, line: str) -> None:
|
||||||
@ -257,9 +162,7 @@ class RPCClient:
|
|||||||
async def _try_read_loop(self) -> None:
|
async def _try_read_loop(self) -> None:
|
||||||
try:
|
try:
|
||||||
await self._read_loop()
|
await self._read_loop()
|
||||||
except asyncio.CancelledError:
|
except Exception:
|
||||||
pass
|
|
||||||
except:
|
|
||||||
self.log.exception("Fatal error in read loop")
|
self.log.exception("Fatal error in read loop")
|
||||||
|
|
||||||
async def _read_loop(self) -> None:
|
async def _read_loop(self) -> None:
|
||||||
@ -275,8 +178,6 @@ class RPCClient:
|
|||||||
except asyncio.LimitOverrunError as e:
|
except asyncio.LimitOverrunError as e:
|
||||||
self.log.warning(f"Buffer overrun: {e}")
|
self.log.warning(f"Buffer overrun: {e}")
|
||||||
line += await self._reader.read(self._reader._limit)
|
line += await self._reader.read(self._reader._limit)
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
if not line:
|
if not line:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
@ -286,12 +187,11 @@ class RPCClient:
|
|||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
await self._handle_incoming_line(line_str)
|
await self._handle_incoming_line(line_str)
|
||||||
except asyncio.CancelledError:
|
except Exception:
|
||||||
raise
|
|
||||||
except:
|
|
||||||
self.log.exception("Failed to handle incoming request %s", line_str)
|
self.log.exception("Failed to handle incoming request %s", line_str)
|
||||||
self.log.debug("Reader disconnected")
|
self.log.debug("Reader disconnected")
|
||||||
self._on_disconnect()
|
self._reader = None
|
||||||
|
self._writer = None
|
||||||
|
|
||||||
async def _raw_request(self, command: str, is_secret: bool = False, **data: JSON) -> asyncio.Future[JSON]:
|
async def _raw_request(self, command: str, is_secret: bool = False, **data: JSON) -> asyncio.Future[JSON]:
|
||||||
req_id = self._next_req_id
|
req_id = self._next_req_id
|
||||||
@ -305,6 +205,5 @@ class RPCClient:
|
|||||||
return future
|
return future
|
||||||
|
|
||||||
async def request(self, command: str, **data: JSON) -> JSON:
|
async def request(self, command: str, **data: JSON) -> JSON:
|
||||||
await self.wait_for_connection()
|
|
||||||
future = await self._raw_request(command, **data)
|
future = await self._raw_request(command, **data)
|
||||||
return await future
|
return await future
|
||||||
|
@ -85,7 +85,6 @@ class User(DBUser, BaseUser):
|
|||||||
_connection_time: float
|
_connection_time: float
|
||||||
_db_instance: DBUser | None
|
_db_instance: DBUser | None
|
||||||
_sync_lock: SimpleLock
|
_sync_lock: SimpleLock
|
||||||
_is_rpc_reconnecting: bool
|
|
||||||
_logged_in_info: ProfileStruct | None
|
_logged_in_info: ProfileStruct | None
|
||||||
_logged_in_info_time: float
|
_logged_in_info_time: float
|
||||||
|
|
||||||
@ -122,7 +121,6 @@ class User(DBUser, BaseUser):
|
|||||||
self._sync_lock = SimpleLock(
|
self._sync_lock = SimpleLock(
|
||||||
"Waiting for thread sync to finish before handling %s", log=self.log
|
"Waiting for thread sync to finish before handling %s", log=self.log
|
||||||
)
|
)
|
||||||
self._is_rpc_reconnecting = False
|
|
||||||
self._logged_in_info = None
|
self._logged_in_info = None
|
||||||
self._logged_in_info_time = 0
|
self._logged_in_info_time = 0
|
||||||
|
|
||||||
@ -334,8 +332,6 @@ class User(DBUser, BaseUser):
|
|||||||
state_event=BridgeStateEvent.UNKNOWN_ERROR,
|
state_event=BridgeStateEvent.UNKNOWN_ERROR,
|
||||||
error_code="kt-reconnection-error",
|
error_code="kt-reconnection-error",
|
||||||
)
|
)
|
||||||
finally:
|
|
||||||
self._is_rpc_reconnecting = False
|
|
||||||
|
|
||||||
async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
|
async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
|
||||||
if self.client:
|
if self.client:
|
||||||
@ -426,16 +422,7 @@ class User(DBUser, BaseUser):
|
|||||||
sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
|
sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
|
||||||
await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
|
await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
|
||||||
self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")
|
self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")
|
||||||
|
for login_data in login_result.channelList[:sync_count]:
|
||||||
def get_channel_update_time(login_data: LoginDataItem):
|
|
||||||
channel_info = login_data.channel.info
|
|
||||||
return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0
|
|
||||||
|
|
||||||
for login_data in sorted(
|
|
||||||
login_result.channelList,
|
|
||||||
reverse=True,
|
|
||||||
key=get_channel_update_time
|
|
||||||
)[:sync_count]:
|
|
||||||
try:
|
try:
|
||||||
await self._sync_channel(login_data)
|
await self._sync_channel(login_data)
|
||||||
except AuthenticationRequired:
|
except AuthenticationRequired:
|
||||||
@ -558,8 +545,9 @@ class User(DBUser, BaseUser):
|
|||||||
state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
|
state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
|
||||||
if self.is_connected:
|
if self.is_connected:
|
||||||
state.state_event = BridgeStateEvent.CONNECTED
|
state.state_event = BridgeStateEvent.CONNECTED
|
||||||
elif self._is_rpc_reconnecting or self.client:
|
# TODO
|
||||||
state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
|
#elif self._is_logged_in and self._is_reconnecting:
|
||||||
|
# state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
|
||||||
return [state]
|
return [state]
|
||||||
|
|
||||||
async def get_puppet(self) -> pu.Puppet | None:
|
async def get_puppet(self) -> pu.Puppet | None:
|
||||||
@ -594,18 +582,16 @@ class User(DBUser, BaseUser):
|
|||||||
# region KakaoTalk event handling
|
# region KakaoTalk event handling
|
||||||
|
|
||||||
async def on_connect(self) -> None:
|
async def on_connect(self) -> None:
|
||||||
self.is_connected = True
|
|
||||||
self._track_metric(METRIC_CONNECTED, True)
|
|
||||||
""" TODO Don't auto-resync channels if disconnection was too short
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
disconnected_at = self._connection_time
|
disconnected_at = self._connection_time
|
||||||
max_delay = self.config["bridge.resync_max_disconnected_time"]
|
max_delay = self.config["bridge.resync_max_disconnected_time"]
|
||||||
first_connect = self.is_connected is None
|
first_connect = self.is_connected is None
|
||||||
|
self.is_connected = True
|
||||||
|
self._track_metric(METRIC_CONNECTED, True)
|
||||||
if not first_connect and disconnected_at + max_delay < now:
|
if not first_connect and disconnected_at + max_delay < now:
|
||||||
duration = int(now - disconnected_at)
|
duration = int(now - disconnected_at)
|
||||||
self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...")
|
self.log.debug(f"Disconnection lasted {duration} seconds")
|
||||||
"""
|
elif self.temp_disconnect_notices:
|
||||||
if self.temp_disconnect_notices:
|
|
||||||
await self.send_bridge_notice("Connected to KakaoTalk chats")
|
await self.send_bridge_notice("Connected to KakaoTalk chats")
|
||||||
await self.push_bridge_state(BridgeStateEvent.CONNECTED)
|
await self.push_bridge_state(BridgeStateEvent.CONNECTED)
|
||||||
|
|
||||||
@ -632,19 +618,6 @@ class User(DBUser, BaseUser):
|
|||||||
await self.logout()
|
await self.logout()
|
||||||
await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}")
|
await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}")
|
||||||
|
|
||||||
async def on_client_disconnect(self) -> None:
|
|
||||||
self.is_connected = False
|
|
||||||
self._track_metric(METRIC_CONNECTED, False)
|
|
||||||
self.client = None
|
|
||||||
if self._is_logged_in:
|
|
||||||
if self.temp_disconnect_notices:
|
|
||||||
await self.send_bridge_notice(
|
|
||||||
"Disconnected from KakaoTalk: backend helper module exited. "
|
|
||||||
"Will reconnect once module resumes."
|
|
||||||
)
|
|
||||||
self._is_rpc_reconnecting = True
|
|
||||||
asyncio.create_task(self.reload_session())
|
|
||||||
|
|
||||||
async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
|
async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
|
||||||
self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
|
self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
|
||||||
self.oauth_credential = oauth_credential
|
self.oauth_credential = oauth_credential
|
||||||
|
@ -24,10 +24,8 @@ import {
|
|||||||
util,
|
util,
|
||||||
} from "node-kakao"
|
} from "node-kakao"
|
||||||
/** @typedef {import("node-kakao").OAuthCredential} OAuthCredential */
|
/** @typedef {import("node-kakao").OAuthCredential} OAuthCredential */
|
||||||
/** @typedef {import("node-kakao").ChannelType} ChannelType */
|
|
||||||
/** @typedef {import("node-kakao").ReplyAttachment} ReplyAttachment */
|
|
||||||
/** @typedef {import("node-kakao").MentionStruct} MentionStruct */
|
|
||||||
/** @typedef {import("node-kakao/dist/talk").TalkChannelList} TalkChannelList */
|
/** @typedef {import("node-kakao/dist/talk").TalkChannelList} TalkChannelList */
|
||||||
|
/** @typedef {import("node-kakao").ChannelType} ChannelType */
|
||||||
|
|
||||||
import chat from "node-kakao/chat"
|
import chat from "node-kakao/chat"
|
||||||
const { KnownChatType } = chat
|
const { KnownChatType } = chat
|
||||||
@ -508,16 +506,13 @@ export default class PeerClient {
|
|||||||
* @param {string} req.mxid
|
* @param {string} req.mxid
|
||||||
* @param {Object} req.channel_props
|
* @param {Object} req.channel_props
|
||||||
* @param {string} req.text
|
* @param {string} req.text
|
||||||
* @param {?ReplyAttachment} req.reply_to
|
|
||||||
* @param {?MentionStruct[]} req.mentions
|
|
||||||
*/
|
*/
|
||||||
sendChat = async (req) => {
|
sendChat = async (req) => {
|
||||||
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
|
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
|
||||||
|
|
||||||
return await talkChannel.sendChat({
|
return await talkChannel.sendChat({
|
||||||
|
type: KnownChatType.TEXT,
|
||||||
text: req.text,
|
text: req.text,
|
||||||
type: !!req.reply_to ? KnownChatType.REPLY : KnownChatType.TEXT,
|
|
||||||
attachment: !req.mentions ? req.reply_to : {...req.reply_to, mentions: req.mentions},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -526,7 +521,7 @@ export default class PeerClient {
|
|||||||
* @param {string} req.mxid
|
* @param {string} req.mxid
|
||||||
* @param {Object} req.channel_props
|
* @param {Object} req.channel_props
|
||||||
* @param {int} req.type
|
* @param {int} req.type
|
||||||
* @param {number[]} req.data
|
* @param {[number]} req.data
|
||||||
* @param {string} req.name
|
* @param {string} req.name
|
||||||
* @param {?int} req.width
|
* @param {?int} req.width
|
||||||
* @param {?int} req.height
|
* @param {?int} req.height
|
||||||
|
@ -52,6 +52,9 @@ export default class ClientManager {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
await fs.promises.mkdir(path.dirname(socketPath), 0o700)
|
await fs.promises.mkdir(path.dirname(socketPath), 0o700)
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
await fs.promises.unlink(socketPath)
|
||||||
|
} catch (err) {}
|
||||||
await promisify(cb => this.server.listen(socketPath, cb))
|
await promisify(cb => this.server.listen(socketPath, cb))
|
||||||
await fs.promises.chmod(socketPath, 0o700)
|
await fs.promises.chmod(socketPath, 0o700)
|
||||||
this.log("Now listening at", socketPath)
|
this.log("Now listening at", socketPath)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user