121 lines
4.5 KiB
Python
121 lines
4.5 KiB
Python
# mautrix-line - A very hacky Matrix-LINE bridge based on using LINE's Chrome Store Extension in Puppeteer
|
|
# Copyright (C) 2021 Tulir Asokan, Andrew Ferrazzutti
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from typing import AsyncGenerator, TypedDict, List, Tuple, Dict, Callable, Awaitable, Any
|
|
from collections import deque
|
|
import asyncio
|
|
|
|
from .rpc import RPCClient
|
|
from .types import ChatListInfo, ChatInfo, Message, StartStatus
|
|
from mautrix_line.rpc.types import RPCError
|
|
|
|
|
|
class LoginCommand(TypedDict):
|
|
content: str
|
|
|
|
|
|
class Client(RPCClient):
|
|
async def start(self) -> StartStatus:
|
|
await self.connect()
|
|
return StartStatus.deserialize(await self.request("start"))
|
|
|
|
async def stop(self) -> None:
|
|
await self.request("stop")
|
|
await self.disconnect()
|
|
|
|
async def get_chats(self) -> List[ChatListInfo]:
|
|
resp = await self.request("get_chats")
|
|
return [ChatListInfo.deserialize(data) for data in resp]
|
|
|
|
async def get_chat(self, chat_id: int) -> ChatInfo:
|
|
return ChatInfo.deserialize(await self.request("get_chat", chat_id=chat_id))
|
|
|
|
async def get_messages(self, chat_id: int) -> List[Message]:
|
|
resp = await self.request("get_messages", chat_id=chat_id)
|
|
return [Message.deserialize(data) for data in resp]
|
|
|
|
async def is_connected(self) -> bool:
|
|
resp = await self.request("is_connected")
|
|
return resp["is_connected"]
|
|
|
|
async def send(self, chat_id: int, text: str) -> int:
|
|
resp = await self.request("send", chat_id=chat_id, text=text)
|
|
return resp["id"]
|
|
|
|
async def set_last_message_ids(self, msg_ids: Dict[int, int]) -> None:
|
|
await self.request("set_last_message_ids", msg_ids=msg_ids)
|
|
|
|
async def on_message(self, func: Callable[[Message], Awaitable[None]]) -> None:
|
|
async def wrapper(data: Dict[str, Any]) -> None:
|
|
await func(Message.deserialize(data["message"]))
|
|
|
|
self.add_event_handler("message", wrapper)
|
|
|
|
# TODO Type hint for sender
|
|
async def login(self, sender, **login_data) -> AsyncGenerator[Tuple[str, str], None]:
|
|
login_data["login_type"] = sender.command_status["login_type"]
|
|
|
|
data = deque()
|
|
event = asyncio.Event()
|
|
|
|
async def qr_handler(req: LoginCommand) -> None:
|
|
data.append(("qr", req["url"]))
|
|
event.set()
|
|
|
|
async def pin_handler(req: LoginCommand) -> None:
|
|
data.append(("pin", req["pin"]))
|
|
event.set()
|
|
|
|
async def failure_handler(req: LoginCommand) -> None:
|
|
data.append(("failure", req.get("reason")))
|
|
event.set()
|
|
|
|
async def cancel_watcher() -> None:
|
|
try:
|
|
while sender.command_status is not None:
|
|
await asyncio.sleep(1)
|
|
await self._raw_request("cancel_login")
|
|
except asyncio.CancelledError:
|
|
pass
|
|
cancel_watcher_task = asyncio.create_task(cancel_watcher())
|
|
|
|
def login_handler(_fut: asyncio.Future) -> None:
|
|
cancel_watcher_task.cancel()
|
|
e = _fut.exception()
|
|
if e is not None:
|
|
data.append(("error", str(e)))
|
|
data.append(None)
|
|
event.set()
|
|
|
|
login_future = await self._raw_request("login", **login_data)
|
|
login_future.add_done_callback(login_handler)
|
|
|
|
self.add_event_handler("qr", qr_handler)
|
|
self.add_event_handler("pin", pin_handler)
|
|
self.add_event_handler("failure", failure_handler)
|
|
try:
|
|
while True:
|
|
await event.wait()
|
|
while len(data) > 0:
|
|
item = data.popleft()
|
|
if item is None:
|
|
return
|
|
yield item
|
|
event.clear()
|
|
finally:
|
|
self.remove_event_handler("qr", qr_handler)
|
|
self.remove_event_handler("pin", pin_handler)
|
|
self.remove_event_handler("failure", failure_handler)
|