matrix-appservice-kakaotalk/matrix_appservice_kakaotalk/commands/auth.py

370 lines
13 KiB
Python

# matrix-appservice-kakaotalk - A Matrix-KakaoTalk puppeting bridge.
# Copyright (C) 2022 Tulir Asokan, Andrew Ferrazzutti
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#import time
#from yarl import URL
from mautrix.bridge.commands import HelpSection, command_handler
from mautrix.errors import MForbidden
#from mautrix.util.signed_token import sign_token
from ..kt.client import Client as KakaoTalkClient
from ..kt.client.errors import (
AnotherLogonDetected,
CommandException,
DeviceVerificationRequired,
IncorrectPasscode,
IncorrectPassword,
)
from .. import puppet as pu
from ..db import LoginCredential
from .typehint import CommandEvent
SECTION_AUTH = HelpSection("Authentication", 10, "")
""" TODO Implement web login
web_unsupported = (
"This instance of the KakaoTalk bridge does not support the web-based login interface"
)
alternative_web_login = (
"Alternatively, you may use [the web-based login interface]({url}) "
"to prevent the bridge and homeserver from seeing your password"
)
forced_web_login = (
"This instance of the KakaoTalk bridge does not allow in-Matrix login. "
"Please use [the web-based login interface]({url})."
)
"""
send_password = "Please send your password here to log in"
missing_email = "Please use `$cmdprefix+sp login <email>` to log in here"
try_again_or_cancel = "Try again, or say `$cmdprefix+sp cancel` to give up."
_CMD_CONTINUE_LOGIN = "continue-login"
@command_handler(
needs_auth=False,
management_only=True,
help_section=SECTION_AUTH,
help_text="Log in to KakaoTalk. Optionally save your password so the bridge may use it when needed to restore your login session",
help_args="[--save] <_email_>",
)
async def login(evt: CommandEvent) -> None:
if await evt.sender.is_logged_in():
await evt.reply("You're already logged in")
return
num_args = len(evt.args)
save = num_args > 0 and evt.args[0] == "--save"
# TODO Once web login is implemented, don't make <email> a mandatory argument
if not save and num_args != 1:
await evt.reply("**Usage:** `$cmdprefix+sp login [--save] <email>`")
return
email = evt.args[0 if not save else 1] if num_args > 0 else None
if email:
try:
creds = await LoginCredential.get_by_mxid(evt.sender.mxid)
except:
evt.log.exception("Exception while looking for saved password")
creds = None
if creds and creds.email == email:
await evt.reply("Logging in with saved password")
evt.sender.command_status = {
"action": "Login with saved password",
"room_id": evt.room_id,
"save": True,
}
await _login_with_password(evt, email, creds.password, evt.sender.force_login)
return
evt.sender.command_status = {
"action": "Login",
"room_id": evt.room_id,
"next": _enter_password,
"email": email,
"save": save,
"forced": evt.sender.force_login,
}
""" TODO Implement web login
if evt.bridge.public_website:
external_url = URL(evt.config["appservice.public.external"])
token = sign_token(
evt.bridge.public_website.secret_key,
{
"mxid": evt.sender.mxid,
"expiry": int(time.time()) + 30 * 60,
},
)
url = (external_url / "login.html").with_fragment(token)
if not evt.config["appservice.public.allow_matrix_login"]:
await evt.reply(forced_web_login.format(url=url))
elif email:
await evt.reply(f"{send_password}. {alternative_web_login.format(url=url)}.")
else:
await evt.reply(f"{missing_email}. {alternative_web_login.format(url=url)}.")
elif not email:
await evt.reply(f"{missing_email}. {web_unsupported}.")
else:
await evt.reply(f"{send_password}. {web_unsupported}.")
"""
if not email:
await evt.reply(f"{missing_email}.")
else:
save_warning = (
" NOTE: With `--save`, the bridge will store your KakaoTalk password **unencrypted** until you discard it with `$cmdprefix+sp forget-password`."
" If you would rather not have the bridge save your password, type `$cmdprefix+sp cancel`, then `$cmdprefix+sp login <email>` without `--save`."
) if save else ""
await evt.reply(f"{send_password}.{save_warning}")
async def _enter_password(evt: CommandEvent) -> None:
try:
await evt.az.intent.redact(evt.room_id, evt.event_id)
except MForbidden:
await evt.mark_read()
assert evt.sender.command_status
await _login_with_password(
evt,
evt.sender.command_status.pop("email"),
evt.content.body,
evt.sender.command_status.pop("forced"),
)
async def _login_with_password(evt: CommandEvent, email: str, password: str, forced: bool) -> None:
req = {
"uuid": await evt.sender.get_uuid(),
"form": {
"email": email,
"password": password,
},
"forced": forced,
}
await _try_login(evt, req)
async def _try_login(evt: CommandEvent, req: dict) -> None:
save = (
evt.sender.command_status.get("save", False)
if evt.sender.command_status
else False
)
try:
await _do_login(evt, req, save)
except DeviceVerificationRequired as e:
if evt.sender.command_status and evt.sender.command_status.get("next") != _enter_dv_code:
await evt.reply(
"Open KakaoTalk on your smartphone. It should show a device registration passcode. "
"Enter that passcode here."
)
evt.sender.command_status = {
"action": "Login",
"room_id": evt.room_id,
"next": _enter_dv_code,
"save": save,
"req": req,
}
else:
evt.log.error("Device registration still required after having just registered")
await _handle_login_failure(evt, e)
except AnotherLogonDetected as e:
if not req["forced"]:
await evt.reply(
"You are currently logged in to KakaoTalk on a PC or another bridge. "
"In order to log in to this bridge, you will be forced to log out from your other session. "
f"To proceed, type `$cmdprefix+sp {_CMD_CONTINUE_LOGIN}`. Otherwise, type `$cmdprefix+sp cancel`."
)
evt.sender.command_status = {
"action": "Login",
"room_id": evt.room_id,
"next": _force_login,
"save": save,
"req": req,
}
else:
evt.log.error("Failed to force-login while logged in elsewhere")
await _handle_login_failure(evt, e)
except IncorrectPassword:
await evt.reply(f"Incorrect password. {try_again_or_cancel}")
evt.sender.command_status = {
"action": "Login",
"room_id": evt.room_id,
"next": _enter_password,
"email": req["form"]["email"],
"save": save,
"forced": req["forced"],
}
#except OAuthException as e:
# await evt.reply(f"Error from KakaoTalk:\n\n> {e}")
except Exception as e:
await _handle_login_failure(evt, e)
async def _enter_dv_code(evt: CommandEvent) -> None:
assert evt.sender.command_status
req: dict = evt.sender.command_status.pop("req")
passcode = evt.content.body
await evt.mark_read()
try:
await KakaoTalkClient.register_device(passcode, **req)
except IncorrectPasscode:
await evt.reply(f"Incorrect device registration passcode. {try_again_or_cancel}")
#except OAuthException as e:
# await evt.reply(f"Error from KakaoTalk:\n\n> {e}")
except Exception as e:
await _handle_login_failure(evt, e)
else:
await _try_login(evt, req)
async def _force_login(evt: CommandEvent) -> None:
command = evt.content.body
if command != _CMD_CONTINUE_LOGIN:
await evt.reply(
f"Unknown action `{command}`. "
f"Use `$cmdprefix+sp {_CMD_CONTINUE_LOGIN}` or `$cmdprefix+sp cancel`."
)
return
assert evt.sender.command_status
evt.sender.command_status["req"]["forced"] = True
await evt.mark_read()
await _try_login(evt, evt.sender.command_status.pop("req"))
async def _do_login(evt: CommandEvent, req: dict, save: bool) -> None:
oauth_credential = await KakaoTalkClient.login(**req)
await evt.sender.on_logged_in(oauth_credential)
evt.sender.command_status = None
await evt.reply("Successfully logged in")
try:
if save:
email = req["form"]["email"]
password = req["form"]["password"]
creds = await LoginCredential.get_by_mxid(evt.sender.mxid)
if creds:
evt.log.warning("Found pre-existing login credentials, so updating them now")
creds.email = email
creds.password = password
await creds.save()
else:
await LoginCredential(evt.sender.mxid, email, password).insert()
else:
await LoginCredential.delete_by_mxid(evt.sender.mxid)
except:
evt.log.exception("Error updating saved credentials after successful login")
async def _handle_login_failure(evt: CommandEvent, e: Exception) -> None:
evt.sender.command_status = None
if isinstance(e, CommandException):
message = "Failed to log in"
evt.log.error(message)
else:
message = "Error while logging in"
evt.log.exception(message)
await evt.reply(f"{message}: {e}")
@command_handler(
needs_auth=True,
management_only=True,
help_section=SECTION_AUTH,
help_text="Delete saved login password, if it was saved",
)
async def forget_password(evt: CommandEvent) -> None:
creds = await LoginCredential.get_by_mxid(evt.sender.mxid)
if not creds:
await evt.reply("The bridge wasn't storing your password, so there was nothing to forget.")
else:
await creds.delete()
await evt.reply(
"This bridge is no longer storing your password. "
"If you get logged out unexpectedly, you will have to manually log back in."
)
@command_handler(
needs_auth=True,
help_section=SECTION_AUTH,
help_text="Log out of KakaoTalk (and optionally change your virtual device ID for next login)",
help_args="[--reset-device]",
)
async def logout(evt: CommandEvent) -> None:
if len(evt.args) >= 1:
if evt.args[0] == "--reset-device":
reset_device = True
else:
await evt.reply("**Usage:** `$cmdprefix+sp logout [--reset-device]`")
return
else:
reset_device = False
puppet = await pu.Puppet.get_by_ktid(evt.sender.ktid)
await evt.sender.logout(reset_device=reset_device)
if puppet.is_real_user:
await puppet.switch_mxid(None, None)
message = "Successfully logged out"
if reset_device:
message += (
", and your next login will use a different device ID.\n\n"
"The old device must be manually de-registered from the KakaoTalk app."
)
await evt.reply(message)
@command_handler(needs_auth=False, help_section=SECTION_AUTH, help_text="Change your virtual device ID for next login")
async def reset_device(evt: CommandEvent) -> None:
if await evt.sender.is_logged_in():
await evt.reply("This command requires you to be logged out.")
else:
await evt.mark_read()
await evt.sender.logout(reset_device=True)
await evt.reply(
"Your next login will use a different device ID.\n\n"
"The old device must be manually de-registered from the KakaoTalk app."
)
@command_handler(
needs_auth=False,
help_section=SECTION_AUTH,
help_text="When logging in, automatically log out of any other existing KakaoTalk session",
aliases=["enable-force-login"],
)
async def enable_forced_login(evt: CommandEvent) -> None:
if evt.sender.force_login:
await evt.reply("Forced login is already enabled.")
return
evt.sender.force_login = True
await evt.sender.save()
await evt.reply("Forced login is now enabled.")
@command_handler(
needs_auth=False,
help_section=SECTION_AUTH,
help_text="When logging in, ask before logging out of another existing KakaoTalk session, if one exists",
aliases=["disable-force-login"],
)
async def disable_forced_login(evt: CommandEvent) -> None:
if not evt.sender.force_login:
await evt.reply("Forced login is already disabled.")
return
evt.sender.force_login = False
await evt.sender.save()
await evt.reply("Forced login is now disabled.")