matrix-puppeteer-line/matrix_puppeteer_line/puppet.py

173 lines
6.5 KiB
Python
Raw Normal View History

2021-03-15 01:40:56 -04:00
# matrix-puppeteer-line - A very hacky Matrix-LINE bridge based on running LINE's Chrome extension in Puppeteer
2021-02-26 01:28:54 -05:00
# Copyright (C) 2020-2021 Tulir Asokan, Andrew Ferrazzutti
2020-08-28 09:38:06 -04:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, Dict, TYPE_CHECKING, cast
from mautrix.bridge import BasePuppet
2021-03-26 02:27:21 -04:00
from mautrix.types import UserID, ContentURI
2020-08-28 09:38:06 -04:00
from mautrix.util.simple_template import SimpleTemplate
from .db import Puppet as DBPuppet
from .config import Config
2021-03-26 02:27:21 -04:00
from .rpc import Participant, Client, PathImage
2020-08-28 09:38:06 -04:00
from . import user as u
if TYPE_CHECKING:
from .__main__ import MessagesBridge
class Puppet(DBPuppet, BasePuppet):
by_mid: Dict[str, 'Puppet'] = {}
hs_domain: str
mxid_template: SimpleTemplate[str]
bridge: 'MessagesBridge'
config: Config
default_mxid: UserID
2021-03-26 02:27:21 -04:00
def __init__(self, mid: str, name: Optional[str] = None,
avatar_path: Optional[str] = None, avatar_mxc: Optional[ContentURI] = None,
name_set: bool = False, avatar_set: bool = False,
2021-03-23 02:37:30 -04:00
is_registered: bool = False) -> None:
2021-03-26 02:27:21 -04:00
super().__init__(mid, name, avatar_path, avatar_mxc, name_set, avatar_set, is_registered)
2020-08-28 09:38:06 -04:00
self.log = self.log.getChild(mid)
self.default_mxid = self.get_mxid_from_id(mid)
self.intent = self.az.intent.user(self.default_mxid)
@classmethod
def init_cls(cls, bridge: 'MessagesBridge') -> None:
cls.config = bridge.config
cls.loop = bridge.loop
cls.mx = bridge.matrix
cls.az = bridge.az
cls.bridge = bridge
cls.hs_domain = cls.config["homeserver.domain"]
cls.mxid_template = SimpleTemplate(cls.config["bridge.username_template"], "userid",
prefix="@", suffix=f":{cls.hs_domain}", type=str)
secret = cls.config["bridge.login_shared_secret"]
2021-02-10 02:34:19 -05:00
if secret:
cls.login_shared_secret_map[cls.hs_domain] = secret.encode("utf-8")
cls.login_device_name = "LINE Bridge"
2020-08-28 09:38:06 -04:00
2021-03-26 02:27:21 -04:00
async def update_info(self, info: Participant, client: Optional[Client]) -> None:
2020-08-28 09:38:06 -04:00
update = False
update = await self._update_name(info.name) or update
2021-03-26 02:27:21 -04:00
if client:
update = await self._update_avatar(info.avatar, client) or update
2020-08-28 09:38:06 -04:00
if update:
await self.update()
async def _update_name(self, name: str) -> bool:
name = self.config["bridge.displayname_template"].format(displayname=name)
2021-03-26 02:27:21 -04:00
if name != self.name or not self.name_set:
2020-08-28 09:38:06 -04:00
self.name = name
2021-03-26 02:27:21 -04:00
try:
await self.intent.set_displayname(self.name)
self.name_set = True
except Exception:
self.log.exception("Failed to set displayname")
self.name_set = False
2020-08-28 09:38:06 -04:00
return True
return False
2021-03-26 02:27:21 -04:00
async def _update_avatar(self, avatar: Optional[PathImage], client: Client) -> bool:
if avatar:
if avatar.url and not avatar.path:
if self.avatar_set and self.avatar_path:
self.log.warn(f"Not updating user avatar of {self.name}: new avatar exists, but in a form that cannot be uniquely identified")
return False
else:
self.log.warn(f"Using URL as path for user avatar of {self.name}: no previous avatar exists")
avatar_path = avatar_url = avatar.url
else:
avatar_path = avatar.path
avatar_url = avatar.url
else:
avatar_path = avatar_url = None
if not self.avatar_set or avatar_path != self.avatar_path:
self.log.info(f"Updating user avatar of {self.name}")
2021-03-26 02:27:21 -04:00
self.avatar_path = avatar_path
if avatar_url:
2021-03-26 02:27:21 -04:00
resp = await client.read_image(avatar.url)
self.avatar_mxc = await self.intent.upload_media(resp.data, mime_type=resp.mime)
else:
self.avatar_mxc = ContentURI("")
try:
await self.intent.set_avatar_url(self.avatar_mxc)
self.avatar_set = True
except Exception as e:
self.log.exception(f"Failed to set user avatar: {e}")
2021-03-26 02:27:21 -04:00
self.avatar_set = False
2021-03-23 02:37:30 -04:00
return True
else:
self.log.debug(f"No need to update user avatar of {self.name}, new avatar has same path as old one")
return False
2021-03-23 02:37:30 -04:00
2020-08-28 09:38:06 -04:00
def _add_to_cache(self) -> None:
self.by_mid[self.mid] = self
async def save(self) -> None:
await self.update()
@classmethod
async def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['Puppet']:
mid = cls.get_id_from_mxid(mxid)
if mid:
return await cls.get_by_mid(mid, create)
return None
@classmethod
def get_id_from_mxid(cls, mxid: UserID) -> Optional[str]:
return cls.mxid_template.parse(mxid)
@classmethod
def get_mxid_from_id(cls, mid: str) -> UserID:
return UserID(cls.mxid_template.format_full(mid))
@classmethod
async def get_by_mid(cls, mid: str, create: bool = True) -> Optional['Puppet']:
2021-02-25 22:21:11 -05:00
# TODO Might need to parse a real id from "_OWN"
2020-08-28 09:38:06 -04:00
try:
return cls.by_mid[mid]
except KeyError:
pass
puppet = cast(cls, await super().get_by_mid(mid))
if puppet is not None:
puppet._add_to_cache()
return puppet
if create:
puppet = cls(mid)
await puppet.insert()
puppet._add_to_cache()
return puppet
return None
# TODO When supporting multiple bridge users, this should return the user whose puppet this is
@classmethod
def is_mid_for_own_puppet(cls, mid) -> bool:
return mid.startswith("_OWN_") if mid else False
2020-08-28 09:38:06 -04:00
@classmethod
async def get_by_custom_mxid(cls, mxid: UserID) -> Optional['u.User']:
2021-05-05 02:40:28 -04:00
if mxid == cls.config["bridge.user"]:
return await cls.bridge.get_user(mxid)
2020-08-28 09:38:06 -04:00
return None