Compare commits

..

12 Commits

Author SHA1 Message Date
e94c598e3b Force-disable web-based login until it is implemented 2022-04-08 05:16:12 -04:00
37c983ce0b Don't allow having two node modules use the same socket 2022-04-08 05:15:06 -04:00
3abb296109 Sync channels from most to least recently updated 2022-04-08 05:06:09 -04:00
cf9fbc6d57 Make some more missing inbound properties optional 2022-04-08 05:05:39 -04:00
8ac16e00fc Handle disconnections from the node module
Make the bridge module auto-reconnect to the node module in case the
latter ever crashes (or is started after the bridge module).

Also work towards more holistic auto-reconnect logic in general.
2022-04-08 05:04:46 -04:00
d452735691 Set body of media replies to what official client sets them to 2022-04-06 14:57:06 -04:00
ae9fd46bc8 Outgoing replies to replies 2022-04-06 14:33:01 -04:00
f682edb719 Add missing argument 2022-04-06 14:16:12 -04:00
26d212fd45 Format & put mentions in reply targets 2022-04-06 14:11:15 -04:00
84e6a5829d Outgoing mentions 2022-04-06 12:49:23 -04:00
6867e6b349 Incoming mentions 2022-04-05 18:21:35 -04:00
2b9c59a2af Outbound replies 2022-04-05 15:44:02 -04:00
13 changed files with 364 additions and 238 deletions

View File

@ -57,6 +57,7 @@ class KakaoTalkBridge(Bridge):
def prepare_bridge(self) -> None: def prepare_bridge(self) -> None:
super().prepare_bridge() super().prepare_bridge()
""" TODO Implement web login
if self.config["appservice.public.enabled"]: if self.config["appservice.public.enabled"]:
secret = self.config["appservice.public.shared_secret"] secret = self.config["appservice.public.shared_secret"]
self.public_website = PublicBridgeWebsite(loop=self.loop, shared_secret=secret) self.public_website = PublicBridgeWebsite(loop=self.loop, shared_secret=secret)
@ -65,21 +66,21 @@ class KakaoTalkBridge(Bridge):
) )
else: else:
self.public_website = None self.public_website = None
"""
self.public_website = None
def prepare_stop(self) -> None: def prepare_stop(self) -> None:
self.log.debug("Stopping RPC connection")
KakaoTalkClient.stop_cls()
self.log.debug("Stopping puppet syncers") self.log.debug("Stopping puppet syncers")
for puppet in Puppet.by_custom_mxid.values(): for puppet in Puppet.by_custom_mxid.values():
puppet.stop() puppet.stop()
self.log.debug("Stopping kakaotalk listeners") self.log.debug("Stopping kakaotalk listeners")
User.shutdown = True User.shutdown = True
self.add_shutdown_actions(user.save() for user in User.by_mxid.values()) self.add_shutdown_actions(user.save() for user in User.by_mxid.values())
self.add_shutdown_actions(KakaoTalkClient.stop_cls())
async def start(self) -> None: async def start(self) -> None:
# Block all other startup actions until RPC is ready KakaoTalkClient.init_cls(self.config)
# TODO Remove when/if node backend is replaced with native
await KakaoTalkClient.init_cls(self.config)
self.add_startup_actions(User.init_cls(self)) self.add_startup_actions(User.init_cls(self))
self.add_startup_actions(Puppet.init_cls(self)) self.add_startup_actions(Puppet.init_cls(self))
Portal.init_cls(self) Portal.init_cls(self)

View File

@ -23,7 +23,6 @@ from mautrix.types import UserID
from mautrix.util.config import ConfigUpdateHelper, ForbiddenDefault, ForbiddenKey from mautrix.util.config import ConfigUpdateHelper, ForbiddenDefault, ForbiddenKey
# TODO Remove unneeded configs!!
class Config(BaseBridgeConfig): class Config(BaseBridgeConfig):
def __getitem__(self, key: str) -> Any: def __getitem__(self, key: str) -> Any:
try: try:
@ -94,31 +93,15 @@ class Config(BaseBridgeConfig):
copy("bridge.backfill.initial_limit") copy("bridge.backfill.initial_limit")
copy("bridge.backfill.missed_limit") copy("bridge.backfill.missed_limit")
copy("bridge.backfill.disable_notifications") copy("bridge.backfill.disable_notifications")
if "bridge.periodic_reconnect_interval" in self: """ TODO
base["bridge.periodic_reconnect.interval"] = self["bridge.periodic_reconnect_interval"]
base["bridge.periodic_reconnect.mode"] = self["bridge.periodic_reconnect_mode"]
else:
copy("bridge.periodic_reconnect.interval") copy("bridge.periodic_reconnect.interval")
copy("bridge.periodic_reconnect.mode")
copy("bridge.periodic_reconnect.always") copy("bridge.periodic_reconnect.always")
copy("bridge.periodic_reconnect.min_connected_time") copy("bridge.periodic_reconnect.min_connected_time")
"""
copy("bridge.resync_max_disconnected_time") copy("bridge.resync_max_disconnected_time")
copy("bridge.sync_on_startup") copy("bridge.sync_on_startup")
copy("bridge.temporary_disconnect_notices") copy("bridge.temporary_disconnect_notices")
copy("bridge.disable_bridge_notices") copy("bridge.disable_bridge_notices")
if "bridge.refresh_on_reconnection_fail" in self:
base["bridge.on_reconnection_fail.action"] = (
"refresh" if self["bridge.refresh_on_reconnection_fail"] else None
)
base["bridge.on_reconnection_fail.wait_for"] = 0
elif "bridge.on_reconnection_fail.refresh" in self:
base["bridge.on_reconnection_fail.action"] = (
"refresh" if self["bridge.on_reconnection_fail.refresh"] else None
)
copy("bridge.on_reconnection_fail.wait_for")
else:
copy("bridge.on_reconnection_fail.action")
copy("bridge.on_reconnection_fail.wait_for")
copy("bridge.resend_bridge_info") copy("bridge.resend_bridge_info")
copy("bridge.mute_bridging") copy("bridge.mute_bridging")
copy("bridge.tag_only_on_create") copy("bridge.tag_only_on_create")
@ -126,13 +109,14 @@ class Config(BaseBridgeConfig):
copy_dict("bridge.permissions") copy_dict("bridge.permissions")
""" TODO
for key in ( for key in (
"bridge.periodic_reconnect.interval", "bridge.periodic_reconnect.interval",
"bridge.on_reconnection_fail.wait_for",
): ):
value = base.get(key, None) value = base.get(key, None)
if isinstance(value, list) and len(value) != 2: if isinstance(value, list) and len(value) != 2:
raise ValueError(f"{key} must only be a list of two items") raise ValueError(f"{key} must only be a list of two items")
"""
copy("rpc.connection.type") copy("rpc.connection.type")
if base["rpc.connection.type"] == "unix": if base["rpc.connection.type"] == "unix":

View File

@ -50,6 +50,7 @@ appservice:
max_size: 10 max_size: 10
# Public part of web server for out-of-Matrix interaction with the bridge. # Public part of web server for out-of-Matrix interaction with the bridge.
# TODO Implement web login
public: public:
# Whether or not the public-facing endpoints should be enabled. # Whether or not the public-facing endpoints should be enabled.
enabled: false enabled: false
@ -200,15 +201,13 @@ bridge:
# If using double puppeting, should notifications be disabled # If using double puppeting, should notifications be disabled
# while the initial backfill is in progress? # while the initial backfill is in progress?
disable_notifications: false disable_notifications: false
# TODO Confirm this isn't needed # TODO Implement this
#periodic_reconnect: #periodic_reconnect:
# # Interval in seconds in which to automatically reconnect all users. # # Interval in seconds in which to automatically reconnect all users.
# # This can be used to automatically mitigate the bug where KakaoTalk stops sending messages. # # This may prevent KakaoTalk from "switching servers".
# # Set to -1 to disable periodic reconnections entirely. # # Set to -1 to disable periodic reconnections entirely.
# # Set to a list of two items to randomize the interval (min, max). # # Set to a list of two items to randomize the interval (min, max).
# interval: -1 # interval: -1
# # What to do in periodic reconnects. Either "refresh" or "reconnect"
# mode: refresh
# # Should even disconnected users be reconnected? # # Should even disconnected users be reconnected?
# always: false # always: false
# # Only reconnect if the user has been connected for longer than this value # # Only reconnect if the user has been connected for longer than this value
@ -216,6 +215,7 @@ bridge:
# The number of seconds that a disconnection can last without triggering an automatic re-sync # The number of seconds that a disconnection can last without triggering an automatic re-sync
# and missed message backfilling when reconnecting. # and missed message backfilling when reconnecting.
# Set to 0 to always re-sync, or -1 to never re-sync automatically. # Set to 0 to always re-sync, or -1 to never re-sync automatically.
# TODO Actually use this setting
resync_max_disconnected_time: 5 resync_max_disconnected_time: 5
# Should the bridge do a resync on startup? # Should the bridge do a resync on startup?
sync_on_startup: true sync_on_startup: true

View File

@ -16,11 +16,12 @@
from __future__ import annotations from __future__ import annotations
from typing import Match from typing import Match
from html import escape
import re import re
from mautrix.types import Format, MessageType, TextMessageEventContent from mautrix.types import Format, MessageType, TextMessageEventContent
from ..kt.types.chat.attachment.mention import MentionStruct
from .. import puppet as pu, user as u from .. import puppet as pu, user as u
_START = r"^|\s" _START = r"^|\s"
@ -79,74 +80,21 @@ def _handle_blockquote(output: list[str], blockquote: bool, line: str) -> tuple[
return blockquote, line return blockquote, line
def _handle_codeblock_pre( async def kakaotalk_to_matrix(msg: str | None, mentions: list[MentionStruct] | None) -> TextMessageEventContent:
output: list[str], codeblock: bool, line: str # TODO Shouts
) -> tuple[bool, str, tuple[str | None, str | None, str | None]]:
cb = line.find("```")
cb_lang = None
cb_content = None
post_cb_content = None
if cb != -1:
if not codeblock:
cb_lang = line[cb + 3 :]
if "```" in cb_lang:
end = cb_lang.index("```")
cb_content = cb_lang[:end]
post_cb_content = cb_lang[end + 3 :]
cb_lang = ""
else:
codeblock = True
line = line[:cb]
else:
output.append("</code></pre>")
codeblock = False
line = line[cb + 3 :]
return codeblock, line, (cb_lang, cb_content, post_cb_content)
def _handle_codeblock_post(
output: list[str], cb_lang: str | None, cb_content: str | None, post_cb_content: str | None
) -> None:
if cb_lang is not None:
if cb_lang:
output.append(f'<pre><code class="language-{cb_lang}">')
else:
output.append("<pre><code>")
if cb_content:
output.append(cb_content)
output.append("</code></pre>")
output.append(_convert_formatting(post_cb_content))
async def kakaotalk_to_matrix(msg: str) -> TextMessageEventContent:
text = msg or "" text = msg or ""
mentions = []
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text) content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)
if mentions:
mention_user_ids = [] mention_user_ids = []
for m in reversed(mentions): at_chunks = text.split("@")
original = text[m.offset : m.offset + m.length] for m in mentions:
if len(original) > 0 and original[0] == "@": for idx in m.at:
original = original[1:] chunk = at_chunks[idx]
original = chunk[:m.len]
mention_user_ids.append(int(m.user_id)) mention_user_ids.append(int(m.user_id))
text = f"{text[:m.offset]}@{m.user_id}\u2063{original}\u2063{text[m.offset + m.length:]}" at_chunks[idx] = f"{m.user_id}\u2063{original}\u2063{chunk[m.len:]}"
html = escape(text) text = "@".join(at_chunks)
output = []
if html:
codeblock = False
blockquote = False
line: str
lines = html.split("\n")
for i, line in enumerate(lines):
blockquote, line = _handle_blockquote(output, blockquote, line)
codeblock, line, post_args = _handle_codeblock_pre(output, codeblock, line)
output.append(_convert_formatting(line))
if i != len(lines) - 1:
if codeblock:
output.append("\n")
else:
output.append("<br/>")
_handle_codeblock_post(output, *post_args)
html = "".join(output)
mention_user_map = {} mention_user_map = {}
for ktid in mention_user_ids: for ktid in mention_user_ids:
@ -157,14 +105,13 @@ async def kakaotalk_to_matrix(msg: str) -> TextMessageEventContent:
puppet = await pu.Puppet.get_by_ktid(ktid, create=False) puppet = await pu.Puppet.get_by_ktid(ktid, create=False)
mention_user_map[ktid] = puppet.mxid if puppet else None mention_user_map[ktid] = puppet.mxid if puppet else None
if mention_user_map:
def _mention_replacer(match: Match) -> str: def _mention_replacer(match: Match) -> str:
mxid = mention_user_map[int(match.group(1))] mxid = mention_user_map[int(match.group(1))]
if not mxid: if not mxid:
return match.group(2) return match.group(2)
return f'<a href="https://matrix.to/#/{mxid}">{match.group(2)}</a>' return f'<a href="https://matrix.to/#/{mxid}">{match.group(2)}</a>'
html = MENTION_REGEX.sub(_mention_replacer, html)
if html != escape(content.body).replace("\n", "<br/>\n"):
content.format = Format.HTML content.format = Format.HTML
content.formatted_body = html content.formatted_body = MENTION_REGEX.sub(_mention_replacer, text).replace("\n", "<br/>\n")
return content return content

View File

@ -17,29 +17,35 @@ from __future__ import annotations
from typing import NamedTuple from typing import NamedTuple
from mautrix.types import Format, MessageEventContent, RelationType, RoomID from mautrix.appservice import IntentAPI
from mautrix.types import Format, MessageEventContent, RelationType, RoomID, UserID
from mautrix.util.formatter import ( from mautrix.util.formatter import (
EntityString, EntityString,
EntityType, EntityType,
MarkdownString, MarkdownString,
MatrixParser as BaseMatrixParser, MatrixParser,
SimpleEntity, SimpleEntity,
) )
from mautrix.util.logging import TraceLogger from mautrix.util.logging import TraceLogger
from ..kt.types.bson import Long
from ..kt.types.chat import KnownChatType
from ..kt.types.chat.attachment import ReplyAttachment, MentionStruct
from ..kt.client.types import TO_MSGTYPE_MAP
from .. import puppet as pu, user as u from .. import puppet as pu, user as u
from ..db import Message as DBMessage from ..db import Message as DBMessage
class SendParams(NamedTuple): class SendParams(NamedTuple):
text: str text: str
mentions: list[None] mentions: list[MentionStruct] | None
reply_to: str reply_to: ReplyAttachment
class FacebookFormatString(EntityString[SimpleEntity, EntityType], MarkdownString): class KakaoTalkFormatString(EntityString[SimpleEntity, EntityType], MarkdownString):
def format(self, entity_type: EntityType, **kwargs) -> FacebookFormatString: def format(self, entity_type: EntityType, **kwargs) -> KakaoTalkFormatString:
prefix = suffix = ""
if entity_type == EntityType.USER_MENTION: if entity_type == EntityType.USER_MENTION:
self.entities.append( self.entities.append(
SimpleEntity( SimpleEntity(
@ -49,72 +55,110 @@ class FacebookFormatString(EntityString[SimpleEntity, EntityType], MarkdownStrin
extra_info={"user_id": kwargs["user_id"]}, extra_info={"user_id": kwargs["user_id"]},
) )
) )
self.text = f"@{self.text}"
return self return self
elif entity_type == EntityType.BOLD:
prefix = suffix = "*"
elif entity_type == EntityType.ITALIC: class ToKakaoTalkParser(MatrixParser[KakaoTalkFormatString]):
prefix = suffix = "_" fs = KakaoTalkFormatString
elif entity_type == EntityType.STRIKETHROUGH:
prefix = suffix = "~"
elif entity_type == EntityType.URL: async def _get_id_from_mxid(mxid: UserID) -> Long | None:
if kwargs["url"] != self.text: user = await u.User.get_by_mxid(mxid, create=False)
suffix = f" ({kwargs['url']})" if user and user.ktid:
elif entity_type == EntityType.PREFORMATTED: return user.ktid
prefix = f"```{kwargs['language']}\n"
suffix = "\n```"
elif entity_type == EntityType.INLINE_CODE:
prefix = suffix = "`"
elif entity_type == EntityType.BLOCKQUOTE:
children = self.trim().split("\n")
children = [child.prepend("> ") for child in children]
return self.join(children, "\n")
elif entity_type == EntityType.HEADER:
prefix = "#" * kwargs["size"] + " "
else: else:
return self puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
return puppet.ktid if puppet else None
self._offset_entities(len(prefix))
self.text = f"{prefix}{self.text}{suffix}"
return self
class MatrixParser(BaseMatrixParser[FacebookFormatString]):
fs = FacebookFormatString
async def matrix_to_kakaotalk( async def matrix_to_kakaotalk(
content: MessageEventContent, room_id: RoomID, log: TraceLogger content: MessageEventContent,
room_id: RoomID,
log: TraceLogger,
intent: IntentAPI,
skip_reply: bool = False
) -> SendParams: ) -> SendParams:
mentions = [] # NOTE By design, this *throws* if user intent can't be matched (i.e. if a reply can't be created)
reply_to = None if content.relates_to.rel_type == RelationType.REPLY and not skip_reply:
if content.relates_to.rel_type == RelationType.REPLY:
message = await DBMessage.get_by_mxid(content.relates_to.event_id, room_id) message = await DBMessage.get_by_mxid(content.relates_to.event_id, room_id)
if message: if not message:
content.trim_reply_fallback() raise ValueError(
reply_to = message.ktid
else:
log.warning(
f"Couldn't find reply target {content.relates_to.event_id}" f"Couldn't find reply target {content.relates_to.event_id}"
" to bridge text message reply metadata to Facebook" " to bridge text message reply metadata to KakaoTalk"
) )
if content.get("format", None) == Format.HTML and content["formatted_body"]: try:
parsed = await MatrixParser().parse(content["formatted_body"]) src_event = await intent.get_event(room_id, message.mxid)
except:
log.exception(f"Failed to find Matrix event for reply target {message.mxid}")
raise
src_kt_sender = await _get_id_from_mxid(src_event.sender)
if src_kt_sender is None:
raise ValueError(
f"Found no KakaoTalk user ID for reply target sender {src_event.sender}"
)
content.trim_reply_fallback()
src_converted = await matrix_to_kakaotalk(src_event.content, room_id, log, intent, skip_reply=True)
if src_event.content.relates_to.rel_type == RelationType.REPLY:
src_type = KnownChatType.REPLY
src_message = src_converted.text
else:
src_type = TO_MSGTYPE_MAP[src_event.content.msgtype]
if src_type == KnownChatType.FILE:
src_message = _media_type_reply_body_map[KnownChatType.FILE] + src_converted.text
else:
src_message = _media_type_reply_body_map.get(src_type, src_converted.text)
reply_to = ReplyAttachment(
# NOTE mentions will be merged into this later
# TODO Set this for emoticon reply, but must first support them
attach_only=False,
# TODO If replying with media works, must set type AND all attachment properties
# But then, the reply object must be an intersection of a ReplyAttachment and something else
#attach_type=TO_MSGTYPE_MAP.get(content.msgtype),
# TODO Confirm why official client sets this to 0, and whether this should be left as None instead
attach_type=0,
src_logId=message.ktid,
src_mentions=src_converted.mentions or [],
src_message=src_message,
src_type=src_type,
src_userId=src_kt_sender,
)
else:
reply_to = None
if content.get("format", None) == Format.HTML and content["formatted_body"] and content.msgtype.is_text:
parsed = await ToKakaoTalkParser().parse(content["formatted_body"])
text = parsed.text text = parsed.text
mentions = [] mentions_by_user: dict[Long, MentionStruct] = {}
for mention in parsed.entities: # Make sure to not create remote mentions for any remote user not in the room
if parsed.entities:
joined_members = set(await intent.get_room_members(room_id))
last_offset = 0
at = 0
for mention in sorted(parsed.entities, key=lambda entity: entity.offset):
mxid = mention.extra_info["user_id"] mxid = mention.extra_info["user_id"]
user = await u.User.get_by_mxid(mxid, create=False) if mxid not in joined_members:
if user and user.ktid:
ktid = user.ktid
else:
puppet = await pu.Puppet.get_by_mxid(mxid, create=False)
if puppet:
ktid = puppet.ktid
else:
continue continue
#mentions.append( ktid = await _get_id_from_mxid(mxid)
# Mention(user_id=str(ktid), offset=mention.offset, length=mention.length) if ktid is None:
#) continue
at += text[last_offset:mention.offset+1].count("@")
last_offset = mention.offset+1
mention = mentions_by_user.setdefault(ktid, MentionStruct(
at=[],
len=mention.length,
user_id=ktid,
))
mention.at.append(at)
mentions = list(mentions_by_user.values()) if mentions_by_user else None
else: else:
text = content.body text = content.body
mentions = None
return SendParams(text=text, mentions=mentions, reply_to=reply_to) return SendParams(text=text, mentions=mentions, reply_to=reply_to)
_media_type_reply_body_map: dict[KnownChatType, str] = {
KnownChatType.PHOTO: "Photo",
KnownChatType.VIDEO: "Video",
KnownChatType.AUDIO: "Voice Note",
KnownChatType.FILE: "File: ",
}

View File

@ -22,7 +22,8 @@ with any other potential backend.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, cast, Type, Optional, Union from typing import TYPE_CHECKING, cast, ClassVar, Type, Optional, Union
import asyncio
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import logging import logging
import urllib.request import urllib.request
@ -41,6 +42,7 @@ from ..types.api.struct import FriendListStruct
from ..types.bson import Long from ..types.bson import Long
from ..types.client.client_session import LoginResult from ..types.client.client_session import LoginResult
from ..types.chat import Chatlog, KnownChatType from ..types.chat import Chatlog, KnownChatType
from ..types.chat.attachment import MentionStruct, ReplyAttachment
from ..types.oauth import OAuthCredential, OAuthInfo from ..types.oauth import OAuthCredential, OAuthInfo
from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes from ..types.packet.chat.kickout import KnownKickoutType, KickoutRes
from ..types.request import ( from ..types.request import (
@ -63,7 +65,7 @@ except ImportError:
if TYPE_CHECKING: if TYPE_CHECKING:
from mautrix.types import JSON from mautrix.types import JSON
from ...user import User from ... import user as u
@asynccontextmanager @asynccontextmanager
@ -78,15 +80,22 @@ class Client:
_rpc_client: RPCClient _rpc_client: RPCClient
@classmethod @classmethod
async def init_cls(cls, config: Config) -> None: def init_cls(cls, config: Config) -> None:
"""Initialize RPC to the Node backend.""" """Initialize RPC to the Node backend."""
cls._rpc_client = RPCClient(config) cls._rpc_client = RPCClient(config)
await cls._rpc_client.connect() # NOTE No need to store this, as cancelling the RPCClient will cancel this too
asyncio.create_task(cls._keep_connected())
@classmethod @classmethod
async def stop_cls(cls) -> None: async def _keep_connected(cls) -> None:
while True:
await cls._rpc_client.connect()
await cls._rpc_client.wait_for_disconnection()
@classmethod
def stop_cls(cls) -> None:
"""Stop and disconnect from the Node backend.""" """Stop and disconnect from the Node backend."""
await cls._rpc_client.disconnect() cls._rpc_client.cancel()
# region tokenless commands # region tokenless commands
@ -123,12 +132,15 @@ class Client:
# endregion # endregion
user: u.User
_rpc_disconnection_task: asyncio.Task | None
http: ClientSession http: ClientSession
log: TraceLogger log: TraceLogger
def __init__(self, user: User, log: Optional[TraceLogger] = None): def __init__(self, user: u.User, log: Optional[TraceLogger] = None):
"""Create a per-user client object for user-specific client functionality.""" """Create a per-user client object for user-specific client functionality."""
self.user = user self.user = user
self._rpc_disconnection_task = None
# TODO Let the Node backend use a proxy too! # TODO Let the Node backend use a proxy too!
connector = None connector = None
@ -187,13 +199,27 @@ class Client:
Receive the user's profile info in response. Receive the user's profile info in response.
""" """
profile_req_struct = await self._api_user_request_result(ProfileReqStruct, "start") profile_req_struct = await self._api_user_request_result(ProfileReqStruct, "start")
if not self._rpc_disconnection_task:
self._rpc_disconnection_task = asyncio.create_task(self._rpc_disconnection_handler())
else:
self.log.warning("Called \"start\" on an already-started client")
return profile_req_struct.profile return profile_req_struct.profile
async def stop(self) -> None: async def stop(self) -> None:
"""Immediately stop bridging this user.""" """Immediately stop bridging this user."""
self._stop_listen() self._stop_listen()
if self._rpc_disconnection_task:
self._rpc_disconnection_task.cancel()
else:
self.log.warning("Called \"stop\" on an already-stopped client")
await self._rpc_client.request("stop", mxid=self.user.mxid) await self._rpc_client.request("stop", mxid=self.user.mxid)
async def _rpc_disconnection_handler(self) -> None:
await self._rpc_client.wait_for_disconnection()
self._rpc_disconnection_task = None
self._stop_listen()
asyncio.create_task(self.user.on_client_disconnect())
async def renew_and_save(self) -> None: async def renew_and_save(self) -> None:
"""Renew and save the user's session tokens.""" """Renew and save the user's session tokens."""
oauth_info = await self._api_request_result(OAuthInfo, "renew", oauth_credential=self._oauth_credential) oauth_info = await self._api_request_result(OAuthInfo, "renew", oauth_credential=self._oauth_credential)
@ -273,12 +299,20 @@ class Client:
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid) await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
) )
async def send_message(self, channel_props: ChannelProps, text: str) -> Chatlog: async def send_message(
self,
channel_props: ChannelProps,
text: str,
reply_to: ReplyAttachment | None,
mentions: list[MentionStruct] | None,
) -> Chatlog:
return await self._api_user_request_result( return await self._api_user_request_result(
Chatlog, Chatlog,
"send_chat", "send_chat",
channel_props=channel_props.serialize(), channel_props=channel_props.serialize(),
text=text, text=text,
reply_to=reply_to.serialize() if reply_to is not None else None,
mentions=[m.serialize() for m in mentions] if mentions is not None else None,
) )
async def send_media( async def send_media(

View File

@ -25,11 +25,11 @@ from .mention import MentionStruct
@dataclass(kw_only=True) @dataclass(kw_only=True)
class ReplyAttachment(Attachment): class ReplyAttachment(Attachment):
attach_only: bool attach_only: bool = None # NOTE Made optional
attach_type: int attach_type: Optional[ChatType] = None # NOTE Changed from int for outgoing typeless replies
src_linkId: Optional[Long] = None src_linkId: Optional[Long] = None
src_logId: Long src_logId: Long
src_mentions: list[MentionStruct] src_mentions: list[MentionStruct] = None # NOTE Made optional
src_message: str src_message: str
src_type: ChatType src_type: ChatType
src_userId: Long src_userId: Long

View File

@ -94,15 +94,12 @@ ResultType = TypeVar("ResultType", bound=Serializable)
def ResultListType(result_type: Type[ResultType]): def ResultListType(result_type: Type[ResultType]):
class _ResultListType(list[result_type], Serializable): class _ResultListType(list[result_type], Serializable):
def __init__(self, iterable: Iterable[result_type]=()):
list.__init__(self, (result_type.deserialize(x) for x in iterable))
def serialize(self) -> list[JSON]: def serialize(self) -> list[JSON]:
return [v.serialize() for v in self] return [v.serialize() for v in self]
@classmethod @classmethod
def deserialize(cls, data: list[JSON]) -> "_ResultListType": def deserialize(cls, data: list[JSON]) -> "_ResultListType":
return cls(data) return [result_type.deserialize(item) for item in data]
return _ResultListType return _ResultListType

View File

@ -46,6 +46,7 @@ from mautrix.types import (
Membership, Membership,
MessageEventContent, MessageEventContent,
MessageType, MessageType,
RelationType,
RoomID, RoomID,
TextMessageEventContent, TextMessageEventContent,
UserID, UserID,
@ -68,6 +69,7 @@ from .kt.types.channel.channel_info import ChannelInfo
from .kt.types.channel.channel_type import KnownChannelType, ChannelType from .kt.types.channel.channel_type import KnownChannelType, ChannelType
from .kt.types.chat import Chatlog, ChatType, KnownChatType from .kt.types.chat import Chatlog, ChatType, KnownChatType
from .kt.types.chat.attachment import ( from .kt.types.chat.attachment import (
Attachment,
AudioAttachment, AudioAttachment,
#FileAttachment, #FileAttachment,
MediaAttachment, MediaAttachment,
@ -770,6 +772,8 @@ class Portal(DBPortal, BasePortal):
) -> None: ) -> None:
if message.get_edit(): if message.get_edit():
raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.") raise NotImplementedError("Edits are not supported by the KakaoTalk bridge.")
if message.relates_to.rel_type == RelationType.REPLY and not message.msgtype.is_text:
raise NotImplementedError("Replying with non-text content is not supported by the KakaoTalk bridge.")
sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}") sender, is_relay = await self.get_relay_sender(orig_sender, f"message {event_id}")
if not sender: if not sender:
raise Exception("not logged in") raise Exception("not logged in")
@ -802,14 +806,13 @@ class Portal(DBPortal, BasePortal):
async def _handle_matrix_text( async def _handle_matrix_text(
self, event_id: EventID, sender: u.User, message: TextMessageEventContent self, event_id: EventID, sender: u.User, message: TextMessageEventContent
) -> None: ) -> None:
converted = await matrix_to_kakaotalk(message, self.mxid, self.log) converted = await matrix_to_kakaotalk(message, self.mxid, self.log, self.main_intent)
try: try:
chatlog = await sender.client.send_message( chatlog = await sender.client.send_message(
self.channel_props, self.channel_props,
text=converted.text, text=converted.text,
# TODO reply_to=converted.reply_to,
#mentions=converted.mentions, mentions=converted.mentions,
#reply_to=converted.reply_to,
) )
except CommandException as e: except CommandException as e:
self.log.debug(f"Error handling Matrix message {event_id}: {e!s}") self.log.debug(f"Error handling Matrix message {event_id}: {e!s}")
@ -837,18 +840,6 @@ class Portal(DBPortal, BasePortal):
else: else:
raise NotImplementedError("No file or URL specified") raise NotImplementedError("No file or URL specified")
mimetype = message.info.mimetype or magic.mimetype(data) mimetype = message.info.mimetype or magic.mimetype(data)
""" TODO Replies
reply_to = None
if message.relates_to.rel_type == RelationType.REPLY:
reply_to_msg = await DBMessage.get_by_mxid(message.relates_to.event_id, self.mxid)
if reply_to_msg:
reply_to = reply_to_msg.ktid
else:
self.log.warning(
f"Couldn't find reply target {message.relates_to.event_id}"
" to bridge media message reply metadata to KakaoTalk"
)
"""
filename = message.body filename = message.body
width, height = None, None width, height = None, None
if message.info in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO): if message.info in (MessageType.IMAGE, MessageType.STICKER, MessageType.VIDEO):
@ -863,8 +854,6 @@ class Portal(DBPortal, BasePortal):
width=width, width=width,
height=height, height=height,
ext=guess_extension(mimetype)[1:], ext=guess_extension(mimetype)[1:],
# TODO
#reply_to=reply_to,
) )
except CommandException as e: except CommandException as e:
self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}") self.log.debug(f"Error uploading media for Matrix message {event_id}: {e!s}")
@ -1093,12 +1082,12 @@ class Portal(DBPortal, BasePortal):
async def _handle_remote_text( async def _handle_remote_text(
self, self,
intent: IntentAPI, intent: IntentAPI,
attachment: Attachment | None,
timestamp: int, timestamp: int,
message_text: str | None, message_text: str | None,
**_ **_
) -> list[EventID]: ) -> list[EventID]:
# TODO Handle mentions properly content = await kakaotalk_to_matrix(message_text, attachment.mentions if attachment else None)
content = await kakaotalk_to_matrix(message_text)
return [await self._send_message(intent, content, timestamp=timestamp)] return [await self._send_message(intent, content, timestamp=timestamp)]
async def _handle_remote_reply( async def _handle_remote_reply(
@ -1109,7 +1098,7 @@ class Portal(DBPortal, BasePortal):
message_text: str, message_text: str,
**_ **_
) -> list[EventID]: ) -> list[EventID]:
content = await kakaotalk_to_matrix(message_text) content = await kakaotalk_to_matrix(message_text, attachment.mentions)
await self._add_remote_reply(content, attachment) await self._add_remote_reply(content, attachment)
return [await self._send_message(intent, content, timestamp=timestamp)] return [await self._send_message(intent, content, timestamp=timestamp)]

View File

@ -29,6 +29,46 @@ from .types import RPCError
EventHandler = Callable[[dict[str, Any]], Awaitable[None]] EventHandler = Callable[[dict[str, Any]], Awaitable[None]]
class CancelableEvent:
_event: asyncio.Event
_task: asyncio.Task | None
_cancelled: bool
_loop: asyncio.AbstractEventLoop
def __init__(self, loop: asyncio.AbstractEventLoop | None):
self._event = asyncio.Event()
self._task = None
self._cancelled = False
self._loop = loop or asyncio.get_running_loop()
def is_set(self) -> bool:
return self._event.is_set()
def set(self) -> None:
self._event.set()
self._task = None
def clear(self) -> None:
self._event.clear()
async def wait(self) -> None:
if self._cancelled:
raise asyncio.CancelledError()
if self._event.is_set():
return
if not self._task:
self._task = asyncio.create_task(self._event.wait())
await self._task
def cancel(self) -> None:
self._cancelled = True
if self._task is not None:
self._task.cancel()
def cancelled(self) -> bool:
return self._cancelled
class RPCClient: class RPCClient:
config: Config config: Config
loop: asyncio.AbstractEventLoop loop: asyncio.AbstractEventLoop
@ -41,6 +81,11 @@ class RPCClient:
_response_waiters: dict[int, asyncio.Future[JSON]] _response_waiters: dict[int, asyncio.Future[JSON]]
_event_handlers: dict[str, list[EventHandler]] _event_handlers: dict[str, list[EventHandler]]
_command_queue: asyncio.Queue _command_queue: asyncio.Queue
_read_task: asyncio.Task | None
_connection_task: asyncio.Task | None
_is_connected: CancelableEvent
_is_disconnected: CancelableEvent
_connection_lock: asyncio.Lock
def __init__(self, config: Config) -> None: def __init__(self, config: Config) -> None:
self.config = config self.config = config
@ -52,16 +97,34 @@ class RPCClient:
self._writer = None self._writer = None
self._reader = None self._reader = None
self._command_queue = asyncio.Queue() self._command_queue = asyncio.Queue()
self.loop.create_task(self._command_loop())
self._read_task = None
self._connection_task = None
self._is_connected = CancelableEvent(self.loop)
self._is_disconnected = CancelableEvent(self.loop)
self._is_disconnected.set()
self._connection_lock = asyncio.Lock()
async def connect(self) -> None: async def connect(self) -> None:
if self._writer is not None: async with self._connection_lock:
if self._is_connected.cancelled():
raise asyncio.CancelledError()
if self._is_connected.is_set():
return return
self._connection_task = self.loop.create_task(self._connect())
try:
await self._connection_task
finally:
self._connection_task = None
async def _connect(self) -> None:
if self.config["rpc.connection.type"] == "unix": if self.config["rpc.connection.type"] == "unix":
while True: while True:
try: try:
r, w = await asyncio.open_unix_connection(self.config["rpc.connection.path"]) r, w = await asyncio.open_unix_connection(self.config["rpc.connection.path"])
break break
except asyncio.CancelledError:
raise
except: except:
self.log.warning(f'No unix socket available at {self.config["rpc.connection.path"]}, wait for it to exist...') self.log.warning(f'No unix socket available at {self.config["rpc.connection.path"]}, wait for it to exist...')
await asyncio.sleep(10) await asyncio.sleep(10)
@ -71,6 +134,8 @@ class RPCClient:
r, w = await asyncio.open_connection(self.config["rpc.connection.host"], r, w = await asyncio.open_connection(self.config["rpc.connection.host"],
self.config["rpc.connection.port"]) self.config["rpc.connection.port"])
break break
except asyncio.CancelledError:
raise
except: except:
self.log.warning(f'No TCP connection open at {self.config["rpc.connection.host"]}:{self.config["rpc.connection.path"]}, wait for it to become available...') self.log.warning(f'No TCP connection open at {self.config["rpc.connection.host"]}:{self.config["rpc.connection.path"]}, wait for it to become available...')
await asyncio.sleep(10) await asyncio.sleep(10)
@ -78,16 +143,46 @@ class RPCClient:
raise RuntimeError("invalid rpc connection type") raise RuntimeError("invalid rpc connection type")
self._reader = r self._reader = r
self._writer = w self._writer = w
self.loop.create_task(self._try_read_loop()) self._read_task = self.loop.create_task(self._try_read_loop())
self.loop.create_task(self._command_loop()) self._is_connected.set()
self._is_disconnected.clear()
await self.request("register", peer_id=self.config["appservice.address"]) await self.request("register", peer_id=self.config["appservice.address"])
async def disconnect(self) -> None: async def disconnect(self) -> None:
async with self._connection_lock:
if self._is_disconnected.cancelled():
raise asyncio.CancelledError()
if self._is_disconnected.is_set():
return
await self._disconnect()
async def _disconnect(self) -> None:
if self._writer is not None: if self._writer is not None:
self._writer.write_eof() self._writer.write_eof()
await self._writer.drain() await self._writer.drain()
self._writer = None if self._read_task is not None:
self._read_task.cancel()
self._read_task = None
self._on_disconnect()
def _on_disconnect(self) -> None:
self._reader = None self._reader = None
self._writer = None
self._is_connected.clear()
self._is_disconnected.set()
def wait_for_connection(self) -> Awaitable[None]:
return self._is_connected.wait()
def wait_for_disconnection(self) -> Awaitable[None]:
return self._is_disconnected.wait()
def cancel(self) -> None:
self._is_connected.cancel()
self._is_disconnected.cancel()
if self._connection_task is not None:
self._connection_task.cancel()
asyncio.run(self._disconnect())
@property @property
def _next_req_id(self) -> int: def _next_req_id(self) -> int:
@ -119,7 +214,7 @@ class RPCClient:
for handler in handlers: for handler in handlers:
try: try:
await handler(req) await handler(req)
except Exception: except:
self.log.exception("Exception in event handler") self.log.exception("Exception in event handler")
async def _handle_incoming_line(self, line: str) -> None: async def _handle_incoming_line(self, line: str) -> None:
@ -162,7 +257,9 @@ class RPCClient:
async def _try_read_loop(self) -> None: async def _try_read_loop(self) -> None:
try: try:
await self._read_loop() await self._read_loop()
except Exception: except asyncio.CancelledError:
pass
except:
self.log.exception("Fatal error in read loop") self.log.exception("Fatal error in read loop")
async def _read_loop(self) -> None: async def _read_loop(self) -> None:
@ -178,6 +275,8 @@ class RPCClient:
except asyncio.LimitOverrunError as e: except asyncio.LimitOverrunError as e:
self.log.warning(f"Buffer overrun: {e}") self.log.warning(f"Buffer overrun: {e}")
line += await self._reader.read(self._reader._limit) line += await self._reader.read(self._reader._limit)
except asyncio.CancelledError:
raise
if not line: if not line:
continue continue
try: try:
@ -187,11 +286,12 @@ class RPCClient:
continue continue
try: try:
await self._handle_incoming_line(line_str) await self._handle_incoming_line(line_str)
except Exception: except asyncio.CancelledError:
raise
except:
self.log.exception("Failed to handle incoming request %s", line_str) self.log.exception("Failed to handle incoming request %s", line_str)
self.log.debug("Reader disconnected") self.log.debug("Reader disconnected")
self._reader = None self._on_disconnect()
self._writer = None
async def _raw_request(self, command: str, is_secret: bool = False, **data: JSON) -> asyncio.Future[JSON]: async def _raw_request(self, command: str, is_secret: bool = False, **data: JSON) -> asyncio.Future[JSON]:
req_id = self._next_req_id req_id = self._next_req_id
@ -205,5 +305,6 @@ class RPCClient:
return future return future
async def request(self, command: str, **data: JSON) -> JSON: async def request(self, command: str, **data: JSON) -> JSON:
await self.wait_for_connection()
future = await self._raw_request(command, **data) future = await self._raw_request(command, **data)
return await future return await future

View File

@ -85,6 +85,7 @@ class User(DBUser, BaseUser):
_connection_time: float _connection_time: float
_db_instance: DBUser | None _db_instance: DBUser | None
_sync_lock: SimpleLock _sync_lock: SimpleLock
_is_rpc_reconnecting: bool
_logged_in_info: ProfileStruct | None _logged_in_info: ProfileStruct | None
_logged_in_info_time: float _logged_in_info_time: float
@ -121,6 +122,7 @@ class User(DBUser, BaseUser):
self._sync_lock = SimpleLock( self._sync_lock = SimpleLock(
"Waiting for thread sync to finish before handling %s", log=self.log "Waiting for thread sync to finish before handling %s", log=self.log
) )
self._is_rpc_reconnecting = False
self._logged_in_info = None self._logged_in_info = None
self._logged_in_info_time = 0 self._logged_in_info_time = 0
@ -332,6 +334,8 @@ class User(DBUser, BaseUser):
state_event=BridgeStateEvent.UNKNOWN_ERROR, state_event=BridgeStateEvent.UNKNOWN_ERROR,
error_code="kt-reconnection-error", error_code="kt-reconnection-error",
) )
finally:
self._is_rpc_reconnecting = False
async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None: async def logout(self, *, remove_ktid: bool = True, reset_device: bool = False) -> None:
if self.client: if self.client:
@ -422,7 +426,16 @@ class User(DBUser, BaseUser):
sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels) sync_count = num_channels if sync_count < 0 else min(sync_count, num_channels)
await self.push_bridge_state(BridgeStateEvent.BACKFILLING) await self.push_bridge_state(BridgeStateEvent.BACKFILLING)
self.log.debug(f"Syncing {sync_count} of {num_channels} channels...") self.log.debug(f"Syncing {sync_count} of {num_channels} channels...")
for login_data in login_result.channelList[:sync_count]:
def get_channel_update_time(login_data: LoginDataItem):
channel_info = login_data.channel.info
return channel_info.lastChatLog.sendAt if channel_info.lastChatLog else 0
for login_data in sorted(
login_result.channelList,
reverse=True,
key=get_channel_update_time
)[:sync_count]:
try: try:
await self._sync_channel(login_data) await self._sync_channel(login_data)
except AuthenticationRequired: except AuthenticationRequired:
@ -545,9 +558,8 @@ class User(DBUser, BaseUser):
state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR) state = BridgeState(state_event=BridgeStateEvent.UNKNOWN_ERROR)
if self.is_connected: if self.is_connected:
state.state_event = BridgeStateEvent.CONNECTED state.state_event = BridgeStateEvent.CONNECTED
# TODO elif self._is_rpc_reconnecting or self.client:
#elif self._is_logged_in and self._is_reconnecting: state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
# state.state_event = BridgeStateEvent.TRANSIENT_DISCONNECT
return [state] return [state]
async def get_puppet(self) -> pu.Puppet | None: async def get_puppet(self) -> pu.Puppet | None:
@ -582,16 +594,18 @@ class User(DBUser, BaseUser):
# region KakaoTalk event handling # region KakaoTalk event handling
async def on_connect(self) -> None: async def on_connect(self) -> None:
self.is_connected = True
self._track_metric(METRIC_CONNECTED, True)
""" TODO Don't auto-resync channels if disconnection was too short
now = time.monotonic() now = time.monotonic()
disconnected_at = self._connection_time disconnected_at = self._connection_time
max_delay = self.config["bridge.resync_max_disconnected_time"] max_delay = self.config["bridge.resync_max_disconnected_time"]
first_connect = self.is_connected is None first_connect = self.is_connected is None
self.is_connected = True
self._track_metric(METRIC_CONNECTED, True)
if not first_connect and disconnected_at + max_delay < now: if not first_connect and disconnected_at + max_delay < now:
duration = int(now - disconnected_at) duration = int(now - disconnected_at)
self.log.debug(f"Disconnection lasted {duration} seconds") self.log.debug(f"Disconnection lasted {duration} seconds, not re-syncing channels...")
elif self.temp_disconnect_notices: """
if self.temp_disconnect_notices:
await self.send_bridge_notice("Connected to KakaoTalk chats") await self.send_bridge_notice("Connected to KakaoTalk chats")
await self.push_bridge_state(BridgeStateEvent.CONNECTED) await self.push_bridge_state(BridgeStateEvent.CONNECTED)
@ -618,6 +632,19 @@ class User(DBUser, BaseUser):
await self.logout() await self.logout()
await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}") await self.send_bridge_notice(f"Disconnected from KakaoTalk: {reason_str} {reason_suffix}")
async def on_client_disconnect(self) -> None:
self.is_connected = False
self._track_metric(METRIC_CONNECTED, False)
self.client = None
if self._is_logged_in:
if self.temp_disconnect_notices:
await self.send_bridge_notice(
"Disconnected from KakaoTalk: backend helper module exited. "
"Will reconnect once module resumes."
)
self._is_rpc_reconnecting = True
asyncio.create_task(self.reload_session())
async def on_logged_in(self, oauth_credential: OAuthCredential) -> None: async def on_logged_in(self, oauth_credential: OAuthCredential) -> None:
self.log.debug(f"Successfully logged in as {oauth_credential.userId}") self.log.debug(f"Successfully logged in as {oauth_credential.userId}")
self.oauth_credential = oauth_credential self.oauth_credential = oauth_credential

View File

@ -24,8 +24,10 @@ import {
util, util,
} from "node-kakao" } from "node-kakao"
/** @typedef {import("node-kakao").OAuthCredential} OAuthCredential */ /** @typedef {import("node-kakao").OAuthCredential} OAuthCredential */
/** @typedef {import("node-kakao/dist/talk").TalkChannelList} TalkChannelList */
/** @typedef {import("node-kakao").ChannelType} ChannelType */ /** @typedef {import("node-kakao").ChannelType} ChannelType */
/** @typedef {import("node-kakao").ReplyAttachment} ReplyAttachment */
/** @typedef {import("node-kakao").MentionStruct} MentionStruct */
/** @typedef {import("node-kakao/dist/talk").TalkChannelList} TalkChannelList */
import chat from "node-kakao/chat" import chat from "node-kakao/chat"
const { KnownChatType } = chat const { KnownChatType } = chat
@ -506,13 +508,16 @@ export default class PeerClient {
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {Object} req.channel_props
* @param {string} req.text * @param {string} req.text
* @param {?ReplyAttachment} req.reply_to
* @param {?MentionStruct[]} req.mentions
*/ */
sendChat = async (req) => { sendChat = async (req) => {
const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props) const talkChannel = await this.#getUserChannel(req.mxid, req.channel_props)
return await talkChannel.sendChat({ return await talkChannel.sendChat({
type: KnownChatType.TEXT,
text: req.text, text: req.text,
type: !!req.reply_to ? KnownChatType.REPLY : KnownChatType.TEXT,
attachment: !req.mentions ? req.reply_to : {...req.reply_to, mentions: req.mentions},
}) })
} }
@ -521,7 +526,7 @@ export default class PeerClient {
* @param {string} req.mxid * @param {string} req.mxid
* @param {Object} req.channel_props * @param {Object} req.channel_props
* @param {int} req.type * @param {int} req.type
* @param {[number]} req.data * @param {number[]} req.data
* @param {string} req.name * @param {string} req.name
* @param {?int} req.width * @param {?int} req.width
* @param {?int} req.height * @param {?int} req.height

View File

@ -52,9 +52,6 @@ export default class ClientManager {
} catch (err) { } catch (err) {
await fs.promises.mkdir(path.dirname(socketPath), 0o700) await fs.promises.mkdir(path.dirname(socketPath), 0o700)
} }
try {
await fs.promises.unlink(socketPath)
} catch (err) {}
await promisify(cb => this.server.listen(socketPath, cb)) await promisify(cb => this.server.listen(socketPath, cb))
await fs.promises.chmod(socketPath, 0o700) await fs.promises.chmod(socketPath, 0o700)
this.log("Now listening at", socketPath) this.log("Now listening at", socketPath)