Compare commits

...

2 Commits

Author SHA1 Message Date
Andrew Ferrazzutti 99ea716731 3.8-ify one last thing 2022-07-14 23:49:57 -04:00
Andrew Ferrazzutti adb7453e1a Actually make compatible with Python 3.8
- Replace builtin generic type annotations with classes from Typing
- Replace union type expressions with Union and Optional
2022-07-14 01:41:24 -04:00
35 changed files with 323 additions and 310 deletions

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import Any
from typing import Any, Dict
from mautrix.bridge import Bridge
from mautrix.types import RoomID, UserID
@ -46,7 +46,7 @@ class KakaoTalkBridge(Bridge):
config: Config
matrix: MatrixHandler
#public_website: PublicBridgeWebsite | None
#public_website: Optional[PublicBridgeWebsite] None
def prepare_config(self)->None:
super().prepare_config()
@ -117,7 +117,7 @@ class KakaoTalkBridge(Bridge):
async def count_logged_in_users(self) -> int:
return len([user for user in User.by_ktid.values() if user.ktid])
async def manhole_global_namespace(self, user_id: UserID) -> dict[str, Any]:
async def manhole_global_namespace(self, user_id: UserID) -> Dict[str, Any]:
return {
**await super().manhole_global_namespace(user_id),
"User": User,

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, Awaitable
from typing import TYPE_CHECKING, Awaitable, Optional
import asyncio
from mautrix.bridge.commands import HelpSection, command_handler
@ -209,7 +209,7 @@ class MentionFormatString(EntityString[SimpleEntity, EntityType], MarkdownString
class MentionParser(MatrixParser[MentionFormatString]):
fs = MentionFormatString
async def _get_id_from_mxid(mxid: UserID) -> Long | None:
async def _get_id_from_mxid(mxid: UserID) -> Optional[Long]:
user = await u.User.get_by_mxid(mxid, create=False)
if user and user.ktid:
return user.ktid
@ -294,7 +294,7 @@ async def _edit_friend_by_uuid(evt: CommandEvent, uuid: str, add: bool) -> None:
else:
await _on_friend_edited(evt, friend_struct, add)
async def _on_friend_edited(evt: CommandEvent, friend_struct: FriendStruct | None, add: bool):
async def _on_friend_edited(evt: CommandEvent, friend_struct: Optional[FriendStruct], add: bool):
await evt.reply(f"Friend {'added' if add else 'removed'}")
if friend_struct:
puppet = await pu.Puppet.get_by_ktid(friend_struct.userId)

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import Any
from typing import Any, List, Tuple
import os
from mautrix.bridge.config import BaseBridgeConfig
@ -31,7 +31,7 @@ class Config(BaseBridgeConfig):
return super().__getitem__(key)
@property
def forbidden_defaults(self) -> list[ForbiddenDefault]:
def forbidden_defaults(self) -> List[ForbiddenDefault]:
return [
*super().forbidden_defaults,
ForbiddenDefault("appservice.database", "postgres://username:password@hostname/db"),
@ -118,14 +118,14 @@ class Config(BaseBridgeConfig):
copy("rpc.connection.host")
copy("rpc.connection.port")
def _get_permissions(self, key: str) -> tuple[bool, bool, bool, str]:
def _get_permissions(self, key: str) -> Tuple[bool, bool, bool, str]:
level = self["bridge.permissions"].get(key, "")
admin = level == "admin"
user = level == "user" or admin
relay = level == "relay" or user
return relay, user, admin, level
def get_permissions(self, mxid: UserID) -> tuple[bool, bool, bool, str]:
def get_permissions(self, mxid: UserID) -> Tuple[bool, bool, bool, str]:
permissions = self["bridge.permissions"] or {}
if mxid in permissions:
return self._get_permissions(mxid)

View File

@ -15,9 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Dict, Optional
from asyncpg import Record
from attr import dataclass
from mautrix.types import UserID
@ -35,7 +34,7 @@ class LoginCredential:
password: str
@classmethod
async def get_by_mxid(cls, mxid: UserID) -> LoginCredential | None:
async def get_by_mxid(cls, mxid: UserID) -> Optional[LoginCredential]:
q = "SELECT mxid, email, password FROM login_credential WHERE mxid=$1"
row = await cls.db.fetchrow(q, mxid)
return cls(**row) if row else None
@ -51,7 +50,7 @@ class LoginCredential:
"""
await self.db.execute(q, self.mxid, self.email, self.password)
def get_form(self) -> dict[str, str]:
def get_form(self) -> Dict[str, str]:
return {
"email": self.email,
"password": self.password,

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, List, Optional
from asyncpg import Record
from attr import dataclass, field
@ -34,7 +34,7 @@ class Message:
mxid: EventID
mx_room: RoomID
ktid: Long | None = field(converter=to_optional_long)
ktid: Optional[Long] = field(converter=to_optional_long)
index: int
kt_channel: Long = field(converter=Long)
kt_receiver: Long = field(converter=Long)
@ -45,19 +45,19 @@ class Message:
return cls(**row)
@classmethod
def _from_optional_row(cls, row: Record | None) -> Message | None:
def _from_optional_row(cls, row: Optional[Record]) -> Optional[Message]:
return cls._from_row(row) if row is not None else None
columns = 'mxid, mx_room, ktid, "index", kt_channel, kt_receiver, timestamp'
@classmethod
async def get_all_by_ktid(cls, ktid: int, kt_channel: int, kt_receiver: int) -> list[Message]:
async def get_all_by_ktid(cls, ktid: int, kt_channel: int, kt_receiver: int) -> List[Message]:
q = f"SELECT {cls.columns} FROM message WHERE ktid=$1 AND kt_channel=$2 AND kt_receiver=$3"
rows = await cls.db.fetch(q, ktid, kt_channel, kt_receiver)
return [cls._from_row(row) for row in rows if row]
@classmethod
async def get_by_ktid(cls, ktid: int, kt_channel: int, kt_receiver: int, index: int = 0) -> Message | None:
async def get_by_ktid(cls, ktid: int, kt_channel: int, kt_receiver: int, index: int = 0) -> Optional[Message]:
q = f'SELECT {cls.columns} FROM message WHERE ktid=$1 AND kt_channel=$2 AND kt_receiver=$3 AND "index"=$4'
row = await cls.db.fetchrow(q, ktid, kt_channel, kt_receiver, index)
return cls._from_optional_row(row)
@ -67,13 +67,13 @@ class Message:
await cls.db.execute("DELETE FROM message WHERE mx_room=$1", room_id)
@classmethod
async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Message | None:
async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Optional[Message]:
q = f"SELECT {cls.columns} FROM message WHERE mxid=$1 AND mx_room=$2"
row = await cls.db.fetchrow(q, mxid, mx_room)
return cls._from_optional_row(row)
@classmethod
async def get_most_recent(cls, kt_channel: int, kt_receiver: int) -> Message | None:
async def get_most_recent(cls, kt_channel: int, kt_receiver: int) -> Optional[Message]:
q = (
f"SELECT {cls.columns} "
"FROM message WHERE kt_channel=$1 AND kt_receiver=$2 AND ktid IS NOT NULL "
@ -85,7 +85,7 @@ class Message:
@classmethod
async def get_closest_before(
cls, kt_channel: int, kt_receiver: int, ktid: int
) -> Message | None:
) -> Optional[Message]:
q = (
f"SELECT {cls.columns} "
"FROM message WHERE kt_channel=$1 AND kt_receiver=$2 AND ktid<=$3 "
@ -95,7 +95,7 @@ class Message:
return cls._from_optional_row(row)
@classmethod
async def get_all_since(cls, kt_channel: int, kt_receiver: int, since_ktid: Long | None) -> list[Message]:
async def get_all_since(cls, kt_channel: int, kt_receiver: int, since_ktid: Optional[Long]) -> List[Message]:
q = (
f"SELECT {cls.columns} "
"FROM message WHERE kt_channel=$1 AND kt_receiver=$2 AND ktid>=$3 "
@ -116,10 +116,10 @@ class Message:
ktid: Long,
kt_channel: Long,
kt_receiver: Long,
event_ids: list[EventID],
event_ids: List[EventID],
timestamp: int,
mx_room: RoomID,
) -> list[Message]:
) -> List[Message]:
if not event_ids:
return []
columns = [col.strip('"') for col in cls.columns.split(", ")]

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, List, Optional
from asyncpg import Record
from attr import dataclass, field
@ -36,24 +36,24 @@ class Portal:
ktid: Long = field(converter=Long)
kt_receiver: Long = field(converter=Long)
kt_type: ChannelType
mxid: RoomID | None
name: str | None
description: str | None
photo_id: str | None
avatar_url: ContentURI | None
mxid: Optional[RoomID]
name: Optional[str]
description: Optional[str]
photo_id: Optional[str]
avatar_url: Optional[ContentURI]
encrypted: bool
name_set: bool
topic_set: bool
avatar_set: bool
fully_read_kt_chat: Long | None = field(converter=to_optional_long)
relay_user_id: UserID | None
fully_read_kt_chat: Optional[Long] = field(converter=to_optional_long)
relay_user_id: Optional[UserID]
@classmethod
def _from_row(cls, row: Record) -> Portal:
return cls(**row)
@classmethod
def _from_optional_row(cls, row: Record | None) -> Portal | None:
def _from_optional_row(cls, row: Optional[Record]) -> Optional[Portal]:
return cls._from_row(row) if row is not None else None
_columns = (
@ -62,25 +62,25 @@ class Portal:
)
@classmethod
async def get_by_ktid(cls, ktid: int, kt_receiver: int) -> Portal | None:
async def get_by_ktid(cls, ktid: int, kt_receiver: int) -> Optional[Portal]:
q = f"SELECT {cls._columns} FROM portal WHERE ktid=$1 AND kt_receiver=$2"
row = await cls.db.fetchrow(q, ktid, kt_receiver)
return cls._from_optional_row(row)
@classmethod
async def get_by_mxid(cls, mxid: RoomID) -> Portal | None:
async def get_by_mxid(cls, mxid: RoomID) -> Optional[Portal]:
q = f"SELECT {cls._columns} FROM portal WHERE mxid=$1"
row = await cls.db.fetchrow(q, mxid)
return cls._from_optional_row(row)
@classmethod
async def get_all_by_receiver(cls, kt_receiver: int) -> list[Portal]:
async def get_all_by_receiver(cls, kt_receiver: int) -> List[Portal]:
q = f"SELECT {cls._columns} FROM portal WHERE kt_receiver=$1"
rows = await cls.db.fetch(q, kt_receiver)
return [cls._from_row(row) for row in rows if row]
@classmethod
async def all(cls) -> list[Portal]:
async def all(cls) -> List[Portal]:
q = f"SELECT {cls._columns} FROM portal"
rows = await cls.db.fetch(q)
return [cls._from_row(row) for row in rows if row]

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, List, Optional
from asyncpg import Record
from attr import dataclass, field
@ -34,17 +34,17 @@ class Puppet:
db: ClassVar[Database] = fake_db
ktid: Long = field(converter=Long)
name: str | None
photo_id: str | None
photo_mxc: ContentURI | None
name: Optional[str]
photo_id: Optional[str]
photo_mxc: Optional[ContentURI]
name_set: bool
avatar_set: bool
is_registered: bool
custom_mxid: UserID | None
access_token: str | None
next_batch: SyncToken | None
base_url: URL | None
custom_mxid: Optional[UserID]
access_token: Optional[str]
next_batch: Optional[SyncToken]
base_url: Optional[URL]
@classmethod
def _from_row(cls, row: Record) -> Puppet:
@ -53,11 +53,11 @@ class Puppet:
return cls(**data, base_url=URL(base_url) if base_url else None)
@classmethod
def _from_optional_row(cls, row: Record | None) -> Puppet | None:
def _from_optional_row(cls, row: Optional[Record]) -> Optional[Puppet]:
return cls._from_row(row) if row is not None else None
@classmethod
async def get_by_ktid(cls, ktid: int) -> Puppet | None:
async def get_by_ktid(cls, ktid: int) -> Optional[Puppet]:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
@ -67,7 +67,7 @@ class Puppet:
return cls._from_optional_row(row)
@classmethod
async def get_by_name(cls, name: str) -> Puppet | None:
async def get_by_name(cls, name: str) -> Optional[Puppet]:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
@ -77,7 +77,7 @@ class Puppet:
return cls._from_optional_row(row)
@classmethod
async def get_by_custom_mxid(cls, mxid: UserID) -> Puppet | None:
async def get_by_custom_mxid(cls, mxid: UserID) -> Optional[Puppet]:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "
@ -87,7 +87,7 @@ class Puppet:
return cls._from_optional_row(row)
@classmethod
async def get_all_with_custom_mxid(cls) -> list[Puppet]:
async def get_all_with_custom_mxid(cls) -> List[Puppet]:
q = (
"SELECT ktid, name, photo_id, photo_mxc, name_set, avatar_set, is_registered, "
" custom_mxid, access_token, next_batch, base_url "

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar, List, Set
from typing import TYPE_CHECKING, ClassVar, List, Optional, Set
from asyncpg import Record
from attr import dataclass, field
@ -35,18 +35,18 @@ class User:
mxid: UserID
force_login: bool
was_connected: bool
ktid: Long | None = field(converter=to_optional_long)
uuid: str | None
access_token: str | None
refresh_token: str | None
notice_room: RoomID | None
ktid: Optional[Long] = field(converter=to_optional_long)
uuid: Optional[str]
access_token: Optional[str]
refresh_token: Optional[str]
notice_room: Optional[RoomID]
@classmethod
def _from_row(cls, row: Record) -> User:
return cls(**row)
@classmethod
def _from_optional_row(cls, row: Record | None) -> User | None:
def _from_optional_row(cls, row: Optional[Record]) -> Optional[User]:
return cls._from_row(row) if row is not None else None
_columns = "mxid, force_login, was_connected, ktid, uuid, access_token, refresh_token, notice_room"
@ -58,13 +58,13 @@ class User:
return [cls._from_row(row) for row in rows if row]
@classmethod
async def get_by_ktid(cls, ktid: int) -> User | None:
async def get_by_ktid(cls, ktid: int) -> Optional[User]:
q = f'SELECT {cls._columns} FROM "user" WHERE ktid=$1'
row = await cls.db.fetchrow(q, ktid)
return cls._from_optional_row(row)
@classmethod
async def get_by_mxid(cls, mxid: UserID) -> User | None:
async def get_by_mxid(cls, mxid: UserID) -> Optional[User]:
q = f'SELECT {cls._columns} FROM "user" WHERE mxid=$1'
row = await cls.db.fetchrow(q, mxid)
return cls._from_optional_row(row)

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import Match
from typing import List, Match, Optional
import re
from mautrix.types import Format, MessageType, TextMessageEventContent
@ -28,7 +28,7 @@ from .. import puppet as pu, user as u
MENTION_REGEX = re.compile(r"@(\d+)\u2063(.+?)\u2063")
async def kakaotalk_to_matrix(msg: str | None, mentions: list[MentionStruct] | None) -> TextMessageEventContent:
async def kakaotalk_to_matrix(msg: Optional[str], mentions: Optional[List[MentionStruct]]) -> TextMessageEventContent:
# TODO Shouts
text = msg or ""
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=text)

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import NamedTuple
from typing import Dict, List, NamedTuple, Optional
from mautrix.types import Format, MessageEventContent, RelationType, RoomID, UserID
from mautrix.util import utf16_surrogate
@ -40,7 +40,7 @@ from ..db import Message as DBMessage
class SendParams(NamedTuple):
text: str
mentions: list[MentionStruct] | None
mentions: Optional[List[MentionStruct]]
reply_to: ReplyAttachment
@ -89,7 +89,7 @@ class ToKakaoTalkParser(MatrixParser[KakaoTalkFormatString]):
fs = KakaoTalkFormatString
async def _get_id_from_mxid(mxid: UserID, portal: po.Portal) -> Long | None:
async def _get_id_from_mxid(mxid: UserID, portal: po.Portal) -> Optional[Long]:
orig_sender = await u.User.get_by_mxid(mxid, create=False)
if orig_sender and orig_sender.ktid:
return orig_sender.ktid
@ -161,7 +161,7 @@ async def matrix_to_kakaotalk(
):
parsed = await ToKakaoTalkParser().parse(utf16_surrogate.add(content["formatted_body"]))
text = utf16_surrogate.remove(parsed.text)
mentions_by_user: dict[Long, MentionStruct] = {}
mentions_by_user: Dict[Long, MentionStruct] = {}
# Make sure to not create remote mentions for any remote user not in the room
if parsed.entities:
joined_members = set(await portal.main_intent.get_room_members(room_id))
@ -189,7 +189,7 @@ async def matrix_to_kakaotalk(
return SendParams(text=text, mentions=mentions, reply_to=reply_to)
_media_type_reply_body_map: dict[KnownChatType, str] = {
_media_type_reply_body_map: Dict[KnownChatType, str] = {
KnownChatType.PHOTO: "Photo",
KnownChatType.VIDEO: "Video",
KnownChatType.AUDIO: "Voice Note",

View File

@ -22,7 +22,7 @@ with any other potential backend.
from __future__ import annotations
from typing import TYPE_CHECKING, cast, Awaitable, Type, Optional, Union
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Set, Type, Union, cast
import asyncio
from contextlib import asynccontextmanager
import logging
@ -122,7 +122,7 @@ class Client:
# region tokenless commands
@classmethod
async def generate_uuid(cls, used_uuids: set[str]) -> str:
async def generate_uuid(cls, used_uuids: Set[str]) -> str:
"""Randomly generate a UUID for a (fake) device."""
tries_remaining = 10
while True:
@ -160,10 +160,10 @@ class Client:
user: u.User
_rpc_disconnection_task: asyncio.Task | None
_rpc_disconnection_task: Optional[asyncio.Task]
http: ClientSession
log: TraceLogger
_handler_methods: list[str]
_handler_methods: List[str]
def __init__(self, user: u.User, log: Optional[TraceLogger] = None):
"""Create a per-user client object for user-specific client functionality."""
@ -195,7 +195,7 @@ class Client:
def get(
self,
url: Union[str, URL],
headers: Optional[dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
sandbox: bool = False,
**kwargs,
) -> _RequestContextManager:
@ -215,7 +215,7 @@ class Client:
# region post-token commands
async def start(self) -> SettingsStruct | None:
async def start(self) -> Optional[SettingsStruct]:
"""
Initialize user-specific bridging & state by providing a token obtained from a prior login.
Receive the user's profile info in response.
@ -252,7 +252,7 @@ class Client:
self.user.oauth_credential = oauth_info.credential
await self.user.save()
async def connect(self) -> LoginResult | None:
async def connect(self) -> Optional[LoginResult]:
"""
Start a new talk session by providing a token obtained from a prior login.
Receive a snapshot of account state in response.
@ -304,14 +304,14 @@ class Client:
channel_props=channel_props.serialize(),
)
def get_participants(self, channel_props: ChannelProps) -> Awaitable[list[UserInfoUnion]]:
def get_participants(self, channel_props: ChannelProps) -> Awaitable[List[UserInfoUnion]]:
return self._api_user_request_result(
ResultListType(UserInfoUnion),
"get_participants",
channel_props=channel_props.serialize(),
)
def get_chats(self, channel_props: ChannelProps, sync_from: Long | None, limit: int | None) -> Awaitable[list[Chatlog]]:
def get_chats(self, channel_props: ChannelProps, sync_from: Optional[Long], limit: Optional[int]) -> Awaitable[List[Chatlog]]:
return self._api_user_request_result(
ResultListType(Chatlog),
"get_chats",
@ -320,7 +320,7 @@ class Client:
limit=limit,
)
def get_read_receipts(self, channel_props: ChannelProps, unread_chat_ids: list[Long]) -> Awaitable[list[Receipt]]:
def get_read_receipts(self, channel_props: ChannelProps, unread_chat_ids: List[Long]) -> Awaitable[List[Receipt]]:
return self._api_user_request_result(
ResultListType(Receipt),
"get_read_receipts",
@ -348,7 +348,7 @@ class Client:
"list_friends",
)
async def edit_friend(self, ktid: Long, add: bool) -> FriendStruct | None:
async def edit_friend(self, ktid: Long, add: bool) -> Optional[FriendStruct]:
try:
friend_req_struct = await self._api_user_request_result(
FriendReqStruct,
@ -361,7 +361,7 @@ class Client:
self.log.exception("Unable to deserialize friend struct, but friend should have been edited nonetheless")
return None
async def edit_friend_by_uuid(self, uuid: str, add: bool) -> FriendStruct | None:
async def edit_friend_by_uuid(self, uuid: str, add: bool) -> Optional[FriendStruct]:
try:
friend_req_struct = await self._api_user_request_result(
FriendReqStruct,
@ -374,7 +374,7 @@ class Client:
self.log.exception("Unable to deserialize friend struct, but friend should have been edited nonetheless")
return None
async def get_friend_dm_id(self, friend_id: Long) -> Long | None:
async def get_friend_dm_id(self, friend_id: Long) -> Optional[Long]:
try:
return await self._api_user_request_result(
Long,
@ -385,7 +385,7 @@ class Client:
self.log.exception(f"Could not find friend with ID {friend_id}")
return None
async def get_memo_ids(self) -> list[Long]:
async def get_memo_ids(self) -> List[Long]:
return ResultListType(Long).deserialize(
await self._rpc_client.request("get_memo_ids", mxid=self.user.mxid)
)
@ -406,8 +406,8 @@ class Client:
self,
channel_props: ChannelProps,
text: str,
reply_to: ReplyAttachment | None,
mentions: list[MentionStruct] | None,
reply_to: Optional[ReplyAttachment],
mentions: Optional[List[MentionStruct]],
) -> Awaitable[Long]:
return self._api_user_request_result(
Long,
@ -425,9 +425,9 @@ class Client:
data: bytes,
filename: str,
*,
width: int | None = None,
height: int | None = None,
ext: str | None = None,
width: Optional[int] = None,
height: Optional[int] = None,
ext: Optional[str] = None,
) -> Awaitable[Long]:
return self._api_user_request_result(
Long,
@ -584,14 +584,14 @@ class Client:
# region listeners
def _on_chat(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_chat(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_chat(
Chatlog.deserialize(data["chatlog"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
def _on_chat_deleted(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_chat_deleted(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_chat_deleted(
Long.deserialize(data["chatId"]),
Long.deserialize(data["senderId"]),
@ -600,7 +600,7 @@ class Client:
str(data["channelType"]),
)
def _on_chat_read(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_chat_read(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_chat_read(
Long.deserialize(data["chatId"]),
Long.deserialize(data["senderId"]),
@ -608,12 +608,12 @@ class Client:
str(data["channelType"]),
)
def _on_profile_changed(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_profile_changed(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_profile_changed(
OpenLinkChannelUserInfo.deserialize(data["info"]),
)
def _on_perm_changed(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_perm_changed(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_perm_changed(
Long.deserialize(data["userId"]),
OpenChannelUserPerm(data["perm"]),
@ -622,23 +622,23 @@ class Client:
str(data["channelType"]),
)
def _on_channel_added(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_channel_added(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_channel_added(
ChannelInfo.deserialize(data["channelInfo"]),
)
def _on_channel_join(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_channel_join(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_channel_join(
ChannelInfo.deserialize(data["channelInfo"]),
)
def _on_channel_left(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_channel_left(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_channel_left(
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
def _on_channel_kicked(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_channel_kicked(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_channel_kicked(
Long.deserialize(data["userId"]),
Long.deserialize(data["senderId"]),
@ -646,21 +646,21 @@ class Client:
str(data["channelType"]),
)
def _on_user_join(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_user_join(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_user_join(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
def _on_user_left(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_user_left(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_user_left(
Long.deserialize(data["userId"]),
Long.deserialize(data["channelId"]),
str(data["channelType"]),
)
def _on_channel_meta_change(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_channel_meta_change(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_channel_meta_change(
PortalChannelInfo.deserialize(data["info"]),
Long.deserialize(data["channelId"]),
@ -668,7 +668,7 @@ class Client:
)
def _on_listen_disconnect(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_listen_disconnect(self, data: Dict[str, JSON]) -> Awaitable[None]:
try:
res = KickoutRes.deserialize(data)
except Exception:
@ -676,18 +676,18 @@ class Client:
res = None
return self._on_disconnect(res)
def _on_switch_server(self, _: dict[str, JSON]) -> Awaitable[None]:
def _on_switch_server(self, _: Dict[str, JSON]) -> Awaitable[None]:
# TODO Reconnect automatically instead
return self._on_disconnect(KickoutRes(KnownKickoutType.CHANGE_SERVER))
def _on_disconnect(self, res: KickoutRes | None) -> Awaitable[None]:
def _on_disconnect(self, res: Optional[KickoutRes]) -> Awaitable[None]:
self._stop_listen()
return self.user.on_disconnect(res)
def _on_error(self, data: dict[str, JSON]) -> Awaitable[None]:
def _on_error(self, data: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_error(data)
def _on_unexpected_disconnect(self, _: dict[str, JSON]) -> Awaitable[None]:
def _on_unexpected_disconnect(self, _: Dict[str, JSON]) -> Awaitable[None]:
return self.user.on_unexpected_disconnect()

View File

@ -16,7 +16,7 @@
"""Internal helpers for error handling."""
from __future__ import annotations
from typing import NoReturn, Type
from typing import Dict, NoReturn, Type, Union
from .errors import (
CommandException,
@ -35,7 +35,7 @@ def raise_unsuccessful_response(resp: RootCommandResult) -> NoReturn:
raise _error_code_class_map.get(resp.status, CommandException)(resp)
_error_code_class_map: dict[KnownAuthStatusCode | KnownDataStatusCode | int, Type[CommandException]] = {
_error_code_class_map: Dict[Union[KnownAuthStatusCode, KnownDataStatusCode, int], Type[CommandException]] = {
#KnownAuthStatusCode.INVALID_PHONE_NUMBER: "Invalid phone number",
#KnownAuthStatusCode.SUCCESS_WITH_ACCOUNT: "Success",
#KnownAuthStatusCode.SUCCESS_WITH_DEVICE_CHANGED: "Success (device changed)",

View File

@ -16,6 +16,8 @@
"""Helper functions & types for status codes for the KakaoTalk API."""
from __future__ import annotations
from typing import Dict, Union
from ..types.api.auth_api_client import KnownAuthStatusCode
from ..types.request import KnownDataStatusCode, RootCommandResult
@ -72,7 +74,7 @@ class ResponseError(Exception):
pass
_status_code_message_map: dict[KnownAuthStatusCode | KnownDataStatusCode | int, str] = {
_status_code_message_map: Dict[Union[KnownAuthStatusCode, KnownDataStatusCode, int], str] = {
KnownAuthStatusCode.INVALID_PHONE_NUMBER: "Invalid phone number",
KnownAuthStatusCode.SUCCESS_WITH_ACCOUNT: "Success",
KnownAuthStatusCode.SUCCESS_WITH_DEVICE_CHANGED: "Success (device changed)",

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Custom wrapper classes around types defined by the KakaoTalk API."""
from typing import Optional, NewType, Union
from typing import Dict, List, NewType, Optional, Union
from attr import dataclass
@ -73,8 +73,8 @@ class Receipt(SerializableAttrs):
@dataclass
class PortalChannelParticipantInfo(SerializableAttrs):
participants: list[UserInfoUnion]
kickedUserIds: list[Long]
participants: List[UserInfoUnion]
kickedUserIds: List[Long]
@dataclass
class PortalChannelInfo(SerializableAttrs):
@ -92,7 +92,7 @@ class ChannelProps(SerializableAttrs):
# TODO Add non-media types, like polls & maps
TO_MSGTYPE_MAP: dict[MessageType, KnownChatType] = {
TO_MSGTYPE_MAP: Dict[MessageType, KnownChatType] = {
MessageType.TEXT: KnownChatType.TEXT,
MessageType.IMAGE: KnownChatType.PHOTO,
MessageType.STICKER: KnownChatType.PHOTO,
@ -102,13 +102,13 @@ TO_MSGTYPE_MAP: dict[MessageType, KnownChatType] = {
}
# https://stackoverflow.com/a/483833
FROM_MSGTYPE_MAP: dict[KnownChatType, MessageType] = {v: k for k, v in TO_MSGTYPE_MAP.items()}
FROM_MSGTYPE_MAP: Dict[KnownChatType, MessageType] = {v: k for k, v in TO_MSGTYPE_MAP.items()}
# TODO Consider allowing custom power/perm mappings
# But must update default user level & permissions to match!
FROM_PERM_MAP: dict[OpenChannelUserPerm, int] = {
FROM_PERM_MAP: Dict[OpenChannelUserPerm, int] = {
OpenChannelUserPerm.OWNER: 100,
OpenChannelUserPerm.MANAGER: 50,
# TODO Decide on an appropriate value for this

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, Union
from typing import Dict, List, Optional, Union
from attr import dataclass
@ -54,11 +54,11 @@ class MoreSettingsStruct(SerializableAttrs):
@dataclass
class MoreApps(SerializableAttrs):
recommend: Optional[list[str]] = None # NOTE From unknown[]
all: Optional[list[str]] = None # NOTE From unknown[]
recommend: Optional[List[str]] = None # NOTE From unknown[]
all: Optional[List[str]] = None # NOTE From unknown[]
moreApps: MoreApps
shortcuts: Optional[dict[str, int]] = None # NOTE Made optional
shortcuts: Optional[Dict[str, int]] = None # NOTE Made optional
seasonProfileRev: int
seasonNoticeRev: int
serviceUserId: Union[Long, int]
@ -86,8 +86,8 @@ class MoreSettingsStruct(SerializableAttrs):
@dataclass(kw_only=True)
class LessSettingsStruct(SerializableAttrs):
kakaoAutoLoginDomain: list[str]
daumSsoDomain: list[str]
kakaoAutoLoginDomain: List[str]
daumSsoDomain: List[str]
@dataclass
class GoogleMapsApi(SerializableAttrs):
@ -124,7 +124,7 @@ class LessSettingsStruct(SerializableAttrs):
newPostTerm: int
postExpirationSetting: PostExpirationSetting
kakaoAlertIds: list[int]
kakaoAlertIds: List[int]
@dataclass

View File

@ -13,6 +13,8 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List
from attr import dataclass
from mautrix.types import SerializableAttrs
@ -23,7 +25,7 @@ from .friend_struct import FriendStruct
@dataclass
class FriendBlockedListStruct(SerializableAttrs):
total: int
blockedFriends: list[FriendStruct]
blockedFriends: List[FriendStruct]
__all__ = [

View File

@ -13,6 +13,8 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List
from attr import dataclass
from mautrix.types import SerializableAttrs
@ -24,7 +26,7 @@ from .friend_struct import FriendStruct
@dataclass
class FriendListStruct(SerializableAttrs):
token: Long
friends: list[FriendStruct]
friends: List[FriendStruct]
__all__ = [

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional
from typing import List, Optional
from attr import dataclass
@ -25,7 +25,7 @@ from .friend_struct import FriendStruct
@dataclass
class FriendSearchUserListStruct(SerializableAttrs):
count: int
list: list[FriendStruct]
list: List[FriendStruct]
@dataclass(kw_only=True)
@ -33,7 +33,7 @@ class FriendSearchStruct(SerializableAttrs):
query: str
user: Optional[FriendSearchUserListStruct] = None
plus: Optional[FriendSearchUserListStruct] = None
categories: list[str]
categories: List[str]
total_counts: int

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, Union
from typing import List, Optional, Union
from attr import dataclass
@ -34,7 +34,7 @@ class ProfileFeed(SerializableAttrs):
serviceName: str
typeIconUrl: str
downloadId: str
contents: list[ProfileFeedObject]
contents: List[ProfileFeedObject]
url: str
serviceUrl: Optional[str] = None # NOTE Made optional
webUrl: str
@ -51,7 +51,7 @@ class ProfileFeed(SerializableAttrs):
@dataclass(kw_only=True)
class ProfileFeedList(SerializableAttrs):
totalCnt: int # NOTE Renamed from "totalCnts"
feeds: list[ProfileFeed]
feeds: List[ProfileFeed]
@dataclass
@ -91,7 +91,7 @@ class ProfileStruct(SerializableAttrs):
profileImageUrl: str
fullProfileImageUrl: str
originalProfileImageUrl: str
decoration: list[ProfileDecoration]
decoration: List[ProfileDecoration]
profileFeeds: ProfileFeedList
backgroundFeeds: ProfileFeedList
allowStory: bool

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Generic, TypeVar, Optional
from typing import Dict, Generic, List, Optional, TypeVar
from attr import dataclass
@ -37,7 +37,7 @@ class SetChannelMeta(ChannelMeta):
authorId: Long
updatedAt: int
ChannelMetaMap = dict[ChannelMetaType, SetChannelMeta] # Substitute for Record<ChannelMetaType, SetChannelMeta>
ChannelMetaMap = Dict[ChannelMetaType, SetChannelMeta] # Substitute for Record<ChannelMetaType, SetChannelMeta>
@dataclass(kw_only=True)
@ -50,7 +50,7 @@ class ChannelInfo(Channel):
lastSeenLogId: Long
lastChatLog: Optional[Chatlog] = None
metaMap: ChannelMetaMap
displayUserList: list[DisplayUserInfo]
displayUserList: List[DisplayUserInfo]
pushAlert: bool

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, Union
from typing import List, Optional, Union
from attr import dataclass
from enum import Enum, IntEnum
@ -149,10 +149,10 @@ BotDelCommandStruct = BotCommandStruct
@dataclass(kw_only=True)
class BotMetaContent(SerializableAttrs):
add: Optional[list[BotAddCommandStruct]] = None
update: Optional[list[BotAddCommandStruct]] = None
full: Optional[list[BotAddCommandStruct]] = None
delete: Optional[list[BotDelCommandStruct]] = field(json="del", default=None)
add: Optional[List[BotAddCommandStruct]] = None
update: Optional[List[BotAddCommandStruct]] = None
full: Optional[List[BotAddCommandStruct]] = None
delete: Optional[List[BotDelCommandStruct]] = field(json="del", default=None)
__all__ = [

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional
from typing import List, Optional
from attr import dataclass
@ -25,8 +25,8 @@ from .mention import *
@dataclass(kw_only=True)
class Attachment(SerializableAttrs):
shout: Optional[bool] = None
mentions: Optional[list[MentionStruct]] = None
urls: Optional[list[str]] = None
mentions: Optional[List[MentionStruct]] = None
urls: Optional[List[str]] = None
@dataclass

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional
from typing import List, Optional
from attr import dataclass
@ -46,16 +46,16 @@ class PhotoAttachment(MediaKeyAttachment):
@dataclass
class MultiPhotoAttachment(Attachment):
kl: list[str]
wl: list[int]
hl: list[int]
csl: list[str]
imageUrls: list[str]
thumbnailUrls: list[str]
thumbnailWidths: list[int]
thumbnailHeights: list[int]
sl: list[int] # NOTE Changed to a list
mtl: list[str] # NOTE Added
kl: List[str]
wl: List[int]
hl: List[int]
csl: List[str]
imageUrls: List[str]
thumbnailUrls: List[str]
thumbnailWidths: List[int]
thumbnailHeights: List[int]
sl: List[int] # NOTE Changed to a list
mtl: List[str] # NOTE Added
@dataclass

View File

@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Union
from typing import List, Union
from attr import dataclass
@ -24,7 +24,7 @@ from ...bson import Long