Add API for puppeteer script

This commit is contained in:
Tulir Asokan 2020-08-18 16:47:06 +03:00
parent c8810a14f5
commit dd16b3d461
11 changed files with 1780 additions and 284 deletions

2
.gitignore vendored
View File

@ -11,7 +11,7 @@ __pycache__
/.eggs
node_modules
/profiles
profiles
/config.yaml
/registration.yaml

View File

@ -1,4 +1,5 @@
{
"parser": "babel-eslint",
"env": {
"es6": true,
"browser": true
@ -13,13 +14,12 @@
"sourceType": "module"
},
"plugins": [
"import",
"react-hooks"
"import"
],
"rules": {
"indent": [
"error",
4
"tab"
],
"linebreak-style": [
"error",
@ -53,12 +53,6 @@
"no-trailing-spaces": [
"error"
],
"camelcase": [
"error",
{
"properties": "always"
}
],
"import/no-unresolved": "off",
"import/named": "error",
"import/namespace": "error",

View File

@ -6,12 +6,21 @@
"type": "git",
"url": "git+https://mau.dev/tulir/mautrix-amp.git"
},
"type": "module",
"main": "src/main.js",
"author": "Tulir Asokan <tulir@maunium.net>",
"license": "AGPL-3.0-or-later",
"homepage": "https://mau.dev/tulir/mautrix-amp",
"scripts": {
"start": "node ./src/main.js"
},
"dependencies": {
"chrono-node": "^2.1.7",
"puppeteer": "^5.2.1"
},
"devDependencies": {
"babel-eslint": "^10.1.0",
"eslint": "^7.7.0",
"eslint-plugin-import": "^2.22.0"
}
}

View File

@ -13,3 +13,56 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
import net from "net"
import fs from "fs"
import path from "path"
import Client from "./client.js"
import { promisify } from "./util.js"
export default class PuppetAPI {
path = "/var/run/mautrix-amp/puppet.sock"
constructor() {
this.server = net.createServer(sock =>
new Client(this, sock, ++this.connIDSequence).start())
this.puppets = new Map()
this.clients = new Map()
this.connIDSequence = 0
}
log(...text) {
console.log("[API]", ...text)
}
async start() {
this.log("Starting server")
try {
await fs.promises.access(path.dirname(this.path))
} catch (err) {
await fs.promises.mkdir(path.dirname(this.path), 0o700)
}
try {
await fs.promises.unlink(this.path)
} catch (err) {}
await promisify(cb => this.server.listen(this.path, cb))
await fs.promises.chmod(this.path, 0o700)
this.log("Now listening at", this.path)
}
async stop() {
this.log("Stopping server")
await promisify(cb => this.server.close(cb))
try {
await fs.promises.unlink(this.path)
} catch (err) {}
this.log("Server stopped")
for (const client of this.clients.values()) {
await client.stop()
}
for (const puppet of this.puppets.values()) {
await puppet.stop()
}
}
}

198
puppet/src/client.js Normal file
View File

@ -0,0 +1,198 @@
// mautrix-amp - A very hacky Matrix-SMS bridge based on using Android Messages for Web in Puppeteer
// Copyright (C) 2020 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
import MessagesPuppeteer from "./puppet.js"
import { emitLines, promisify } from "./util.js"
export default class Client {
/**
* @param {PuppetAPI} manager
* @param {import("net").Socket} socket
* @param {number} connID
* @param {?string} [userID]
* @param {?MessagesPuppeteer} [puppet]
*/
constructor(manager, socket, connID, userID = null, puppet = null) {
this.manager = manager
this.socket = socket
this.connID = connID
this.userID = userID
this.puppet = puppet
this.stopped = false
this.notificationID = 0
this.maxCommandID = 0
}
log(...text) {
if (this.userID) {
console.log(`[API/${this.userID}/${this.connID}]`, ...text)
} else {
console.log(`[API/${this.connID}]`, ...text)
}
}
start() {
this.log("Received connection", this.connID)
emitLines(this.socket)
this.socket.on("line", line => this.handleLine(line)
.catch(err => this.log("Error handling line:", err)))
this.socket.on("end", this.handleEnd)
setTimeout(() => {
if (!this.userID && !this.stopped) {
this.log("Didn't receive register request within 3 seconds, terminating")
this.stop()
}
}, 3000)
}
stop() {
if (this.stopped) {
return
}
this.stopped = true
return promisify(cb => this.socket.end(cb))
}
handleEnd = () => {
this.stopped = true
if (this.userID && this.manager.clients.get(this.userID) === this) {
this.manager.clients.delete(this.userID)
}
this.log(`Connection closed (user: ${this.userID})`)
}
/**
* Write JSON data to the socket.
*
* @param {object} data - The data to write.
* @return {Promise<void>}
*/
_write(data) {
return promisify(cb => this.socket.write(JSON.stringify(data) + "\n", cb))
}
sendMessage(message) {
return this._write({
id: --this.notificationID,
command: "message",
message,
})
}
sendQRCode(url) {
return this._write({
id: --this.notificationID,
command: "qr",
url,
})
}
handleStart = async (req) => {
if (this.puppet !== null) {
return { started: false, is_logged_in: await this.puppet.isLoggedIn() }
}
this.log("Opening new puppeteer for", this.userID)
this.puppet = new MessagesPuppeteer(this.userID, `./profiles/${this.userID}`, this)
this.manager.puppets.set(this.userID, this.puppet)
await this.puppet.start(!!req.debug)
return { started: true, is_logged_in: await this.puppet.isLoggedIn() }
}
handleStop = async () => {
if (this.puppet === null) {
return { stopped: false }
}
this.log("Closing puppeteer for", this.userID)
this.manager.puppets.delete(this.userID)
await this.puppet.stop()
this.puppet = null
return { stopped: true }
}
handleUnknownCommand = () => {
throw new Error("Unknown command")
}
handleRegister = async (req) => {
this.userID = req.user_id
this.log("Registered socket", this.connID, "->", this.userID)
if (this.manager.clients.has(this.userID)) {
const oldClient = this.manager.clients.get(this.userID)
this.manager.clients.set(this.userID, this)
this.log("Terminating previous socket", oldClient.connID, "for", this.userID)
await oldClient.stop()
} else {
this.manager.clients.set(this.userID, this)
}
this.puppet = this.manager.puppets.get(this.userID) || null
if (this.puppet) {
this.puppet.client = this
}
return { client_exists: this.puppet !== null }
}
async handleLine(line) {
if (this.stopped) {
this.log("Ignoring line, client is stopped")
return
}
const req = JSON.parse(line)
if (!req.command || !req.id) {
console.log("Invalid request, terminating", this.connID)
await this.stop()
return
}
if (req.id <= this.maxCommandID) {
this.log("Ignoring old request", req.id)
return
}
this.log("Received request", req.id, "with command", req.command)
this.maxCommandID = req.id
let handler
if (!this.userID) {
if (req.command !== "register") {
this.log("First request wasn't a register request, terminating")
await this.stop()
return
} else if (!req.user_id) {
this.log("Register request didn't contain user ID, terminating")
await this.stop()
return
}
handler = this.handleRegister
} else {
handler = {
start: this.handleStart,
stop: this.handleStop,
login: () => this.puppet.waitForLogin(),
send: req => this.puppet.sendMessage(req.chat_id, req.text),
get_chats: () => this.puppet.getRecentChats(),
get_chat: req => this.puppet.getChatInfo(req.chat_id),
get_messages: req => this.puppet.getMessages(req.chat_id),
}[req.command] || this.handleUnknownCommand
}
const resp = { id: req.id }
try {
resp.command = "response"
resp.response = await handler(req)
} catch (err) {
resp.command = "error"
resp.error = err.toString()
this.log("Error handling request", req.id, err)
}
await this._write(resp)
}
}

View File

@ -84,11 +84,11 @@ class MautrixController {
/**
* @typedef MessageData
* @type {object}
* @property {number} id - The ID of the message. Seems to be sequential.
* @property {number} timestamp - The unix timestamp of the message. Not very accurate.
* @property {boolean} isOutgoing - Whether or not this user sent the message.
* @property {string} [text] - The text in the message.
* @property {string} [image] - The URL to the image in the message.
* @property {number} id - The ID of the message. Seems to be sequential.
* @property {number} timestamp - The unix timestamp of the message. Not very accurate.
* @property {boolean} is_outgoing - Whether or not this user sent the message.
* @property {string} [text] - The text in the message.
* @property {string} [image] - The URL to the image in the message.
*/
/**
@ -103,7 +103,7 @@ class MautrixController {
const messageData = {
id: +element.getAttribute("msg-id"),
timestamp: date ? date.getTime() : null,
isOutgoing: element.getAttribute("is-outgoing") === "true",
is_outgoing: element.getAttribute("is-outgoing") === "true",
}
messageData.text = element.querySelector("mws-text-message-part .text-msg")?.innerText
if (element.querySelector("mws-image-message-part .image-msg")) {
@ -124,13 +124,13 @@ class MautrixController {
let messageDate = null
for (const child of element.children) {
switch (child.tagName.toLowerCase()) {
case "mws-message-wrapper":
messages.push(this._parseMessage(messageDate, child))
break
case "mws-tombstone-message-wrapper":
const dateText = child.getElementsByTagName("mws-relative-timestamp")?.[0]?.innerText
messageDate = (await this._parseDate(dateText, messageDate)) || messageDate
break
case "mws-message-wrapper":
messages.push(this._parseMessage(messageDate, child))
break
case "mws-tombstone-message-wrapper":
const dateText = child.querySelector("mws-relative-timestamp")?.innerText
messageDate = await this._parseDate(dateText, messageDate) || messageDate
break
}
}
return messages
@ -168,7 +168,8 @@ class MautrixController {
* @type object
* @property {number} id - The ID of the chat.
* @property {string} name - The name of the chat.
* @property {string} lastMsg - The most recent message in the chat. May be prefixed by sender name.
* @property {string} lastMsg - The most recent message in the chat.
* May be prefixed by sender name.
* @property {string} lastMsgDate - An imprecise date for the most recent message (e.g. "7:16 PM", "Thu" or "Aug 4")
*/
@ -225,7 +226,8 @@ class MautrixController {
* @return {Promise<string>} - The data URL (containing the mime type and base64 data)
*/
async readImage(id) {
const resp = await fetch(url)
const imageElement = document.querySelector(`mws-message-wrapper[msg-id="${id}"] mws-image-message-part .image-msg`)
const resp = await fetch(imageElement.getAttribute(src))
const reader = new FileReader()
const promise = new Promise((resolve, reject) => {
reader.onload = () => resolve(reader.result)

View File

@ -13,252 +13,23 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const puppeteer = require("puppeteer")
const chrono = require("chrono-node")
const process = require("process")
const path = require("path")
import process from "process"
const TaskQueue = require("./taskqueue")
import PuppetAPI from "./api.js"
class MessagesPuppeteer {
url = "https://messages.google.com/web/"
const api = new PuppetAPI()
constructor(profilePath) {
if (!profilePath.startsWith("/")) {
profilePath = path.join(process.cwd(), profilePath)
}
this.profilePath = profilePath
this.updatedChats = new Set()
this.mostRecentMessages = new Map()
this.taskQueue = new TaskQueue()
}
/**
* Start the browser and open the messages for web page.
* This must be called before doing anything else.
*/
async start(debug = false) {
console.log("Launching browser")
this.browser = await puppeteer.launch({
userDataDir: this.profilePath,
headless: !debug,
defaultViewport: { width: 1920, height: 1080 },
})
console.log("Opening new tab")
const pages = await this.browser.pages()
if (pages.length > 0) {
this.page = pages[0]
} else {
this.page = await this.browser.newPage()
}
console.log("Opening", this.url)
await this.page.goto(this.url)
console.log("Injecting content script")
await this.page.addScriptTag({ path: "./src/contentscript.js", type: "module" })
console.log("Exposing this._receiveQRChange")
await this.page.exposeFunction("__mautrixReceiveQR", this._receiveQRChange.bind(this))
console.log("Exposing this._receiveChatListChanges")
await this.page.exposeFunction("__mautrixReceiveChanges", this._receiveChatListChanges.bind(this))
console.log("Exposing chrono.parseDate")
await this.page.exposeFunction("__chronoParseDate", chrono.parseDate)
console.log("Waiting for load")
// Wait for the page to load (either QR code for login or chat list when already logged in)
await Promise.race([
this.page.waitForSelector("mw-main-container mws-conversations-list .conv-container", { visible: true }),
this.page.waitForSelector("mw-authentication-container mw-qr-code", { visible: true }),
])
this.taskQueue.start()
if (await this.isLoggedIn()) {
await this.startObserving()
}
console.log("Startup complete")
}
/**
* Wait for the session to be logged in and monitor QR code changes while it's not.
*/
async waitForLogin() {
if (await this.isLoggedIn()) {
return
}
const qrSelector = "mw-authentication-container mw-qr-code"
console.log("Clicking Remember Me button")
await this.page.click("mat-slide-toggle:not(.mat-checked) > label")
console.log("Fetching current QR code")
const currentQR = await this.page.$eval(qrSelector, element => element.getAttribute("data-qr-code"))
this._receiveQRChange(currentQR)
console.log("Adding QR observer")
await this.page.$eval(qrSelector, element => window.__mautrixController.addQRObserver(element))
console.log("Waiting for login")
await this.page.waitForSelector("mws-conversations-list .conv-container", {
visible: true,
timeout: 0,
})
console.log("Removing QR observer")
await this.page.evaluate(() => window.__mautrixController.removeQRObserver())
await this.startObserving()
console.log("Login complete")
}
/**
* Close the browser.
*/
async stop() {
this.taskQueue.stop()
await this.page.close()
await this.browser.close()
console.log("Everything stopped")
}
/**
* Check if the session is currently logged in.
*
* @return {Promise<boolean>} - Whether or not the session is logged in.
*/
async isLoggedIn() {
return (await this.page.$("mw-main-container mws-conversations-list")) !== null
}
/**
* Get the IDs of the most recent chats.
*
* @return {Promise<[ChatListInfo]>} - List of chat IDs in order of most recent message.
*/
async getRecentChats() {
return await this.page.$eval("mws-conversations-list .conv-container",
elem => window.__mautrixController.parseChatList(elem))
}
/**
* @typedef ChatInfo
* @type object
* @property {[Participant]} participants
*/
/**
* Get info about a chat.
*
* @param {number} id - The chat ID whose info to get.
* @return {Promise<ChatInfo>} - Info about the chat.
*/
async getChatInfo(id) {
return await this.taskQueue.push(() => this._getChatInfoUnsafe(id))
}
/**
* Send a message to a chat.
*
* @param {number} chatID - The ID of the chat to send a message to.
* @param {string} text - The text to send.
*/
async sendMessage(chatID, text) {
await this.taskQueue.push(() => this._sendMessageUnsafe(chatID, text))
}
/**
* Get messages in a chat.
*
* @param {number} id The ID of the chat whose messages to get.
* @return {Promise<[MessageData]>} - The messages visible in the chat.
*/
async getMessages(id) {
return this.taskQueue.push(() => this._getMessagesUnsafe(id))
}
async startObserving() {
console.log("Adding chat list observer")
await this.page.$eval("mws-conversations-list .conv-container",
element => window.__mautrixController.addChatListObserver(element))
}
async stopObserving() {
console.log("Removing chat list observer")
await this.page.evaluate(() => window.__mautrixController.removeChatListObserver())
}
_listItemSelector(id) {
return `mws-conversation-list-item > a.list-item[href="/web/conversations/${id}"]`
}
async _switchChatUnsafe(id) {
console.log("Switching to chat", id)
await this.page.click(this._listItemSelector(id))
}
async _getChatInfoUnsafe(id) {
await this._switchChatUnsafe(id)
await this.page.click("mw-conversation-menu button")
await this.page.waitForSelector(".mat-menu-panel button.mat-menu-item.details", { timeout: 1000 })
// There's a 250ms animation and I don't know how to wait for it properly
await new Promise(resolve => setTimeout(resolve, 250))
await this.page.click(".mat-menu-panel button.mat-menu-item.details")
await this.page.waitForSelector("mws-dialog mw-conversation-details .participants", { timeout: 1000 })
const participants = await this.page.$eval("mws-dialog mw-conversation-details .participants",
elem => window.__mautrixController.parseParticipantList(elem))
await this.page.click("mws-dialog mat-dialog-actions button.confirm")
return {
participants,
...await this.page.$eval(this._listItemSelector(id),
elem => window.__mautrixController.parseChatListItem(elem)),
}
}
async _sendMessageUnsafe(chatID, text) {
await this._switchChatUnsafe(chatID)
await this.page.focus("mws-message-compose .input-box textarea")
await this.page.keyboard.type(text)
await this.page.click(".compose-container > mws-message-send-button > button")
}
async _getMessagesUnsafe(id, minID = 0) {
await this._switchChatUnsafe(id)
console.log("Waiting for messages to load")
await this.page.waitFor("mws-message-wrapper")
const messages = await this.page.$eval("mws-messages-list .content",
element => window.__mautrixController.parseMessageList(element))
if (minID) {
return messages.filter(message => message.id > minID)
}
return messages
}
async _processChatListChangeUnsafe(id) {
this.updatedChats.delete(id)
console.log("Processing change to", id)
const lastMsgID = this.mostRecentMessages.get(id) || 0
const messages = await this._getMessagesUnsafe(id, lastMsgID)
if (messages.length === 0) {
console.log("No new messages found in", id)
return
}
const newFirstID = messages[0].id
const newLastID = messages[messages.length - 1].id
this.mostRecentMessages.set(id, newLastID)
console.log(`Loaded messages in ${id} after ${lastMsgID}: got ${newFirstID}-${newLastID}`)
// TODO send messages somewhere
for (const message of messages) {
console.info("New message:", message)
message.chatID = id
}
}
_receiveChatListChanges(changes) {
console.log("Received chat list changes:", changes)
for (const item of changes) {
if (!this.updatedChats.has(item)) {
this.updatedChats.add(item)
this.taskQueue.push(() => this._processChatListChangeUnsafe(item))
.catch(err => console.error("Error handling chat list changes"))
}
}
}
_receiveQRChange(newLink) {
console.info("QR code changed:", newLink)
}
function stop() {
api.stop().then(() => process.exit(0), err => {
console.error("[Main] Error stopping:", err)
process.exit(3)
})
}
module.exports = MessagesPuppeteer
api.start().then(() => {
process.once("SIGINT", stop)
process.once("SIGTERM", stop)
}, err => {
console.error("[Main] Error starting:", err)
process.exit(2)
})

296
puppet/src/puppet.js Normal file
View File

@ -0,0 +1,296 @@
// mautrix-amp - A very hacky Matrix-SMS bridge based on using Android Messages for Web in Puppeteer
// Copyright (C) 2020 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
import process from "process"
import path from "path"
import puppeteer from "puppeteer"
import chrono from "chrono-node"
import TaskQueue from "./taskqueue.js"
import { sleep } from "./util.js"
export default class MessagesPuppeteer {
url = "https://messages.google.com/web/"
/**
*
* @param {string} id
* @param {string} profilePath
* @param {?Client} [client]
*/
constructor(id, profilePath, client = null) {
if (!profilePath.startsWith("/")) {
profilePath = path.join(process.cwd(), profilePath)
}
this.id = id
this.profilePath = profilePath
this.updatedChats = new Set()
this.mostRecentMessages = new Map()
this.taskQueue = new TaskQueue(this.id)
this.client = client
}
log(...text) {
console.log(`[Puppeteer/${this.id}]`, ...text)
}
error(...text) {
console.error(`[Puppeteer/${this.id}]`, ...text)
}
/**
* Start the browser and open the messages for web page.
* This must be called before doing anything else.
*/
async start(debug = false) {
this.log("Launching browser")
this.browser = await puppeteer.launch({
userDataDir: this.profilePath,
headless: !debug,
defaultViewport: { width: 1920, height: 1080 },
})
this.log("Opening new tab")
const pages = await this.browser.pages()
if (pages.length > 0) {
this.page = pages[0]
} else {
this.page = await this.browser.newPage()
}
this.log("Opening", this.url)
await this.page.goto(this.url)
this.log("Injecting content script")
await this.page.addScriptTag({ path: "./src/contentscript.js", type: "module" })
this.log("Exposing functions")
await this.page.exposeFunction("__mautrixReceiveQR", this._receiveQRChange.bind(this))
await this.page.exposeFunction("__mautrixReceiveChanges",
this._receiveChatListChanges.bind(this))
await this.page.exposeFunction("__chronoParseDate", chrono.parseDate)
this.log("Waiting for load")
// Wait for the page to load (either QR code for login or chat list when already logged in)
await Promise.race([
this.page.waitForSelector("mw-main-container mws-conversations-list .conv-container",
{ visible: true }),
this.page.waitForSelector("mw-authentication-container mw-qr-code",
{ visible: true }),
])
this.taskQueue.start()
if (await this.isLoggedIn()) {
await this.startObserving()
}
this.log("Startup complete")
}
/**
* Wait for the session to be logged in and monitor QR code changes while it's not.
*/
async waitForLogin() {
if (await this.isLoggedIn()) {
return
}
const qrSelector = "mw-authentication-container mw-qr-code"
this.log("Clicking Remember Me button")
await this.page.click("mat-slide-toggle:not(.mat-checked) > label")
this.log("Fetching current QR code")
const currentQR = await this.page.$eval(qrSelector,
element => element.getAttribute("data-qr-code"))
this._receiveQRChange(currentQR)
this.log("Adding QR observer")
await this.page.$eval(qrSelector,
element => window.__mautrixController.addQRObserver(element))
this.log("Waiting for login")
await this.page.waitForSelector("mws-conversations-list .conv-container", {
visible: true,
timeout: 0,
})
this.log("Removing QR observer")
await this.page.evaluate(() => window.__mautrixController.removeQRObserver())
await this.startObserving()
this.log("Login complete")
}
/**
* Close the browser.
*/
async stop() {
this.taskQueue.stop()
await this.page.close()
await this.browser.close()
this.log("Everything stopped")
}
/**
* Check if the session is currently logged in.
*
* @return {Promise<boolean>} - Whether or not the session is logged in.
*/
async isLoggedIn() {
return await this.page.$("mw-main-container mws-conversations-list") !== null
}
/**
* Get the IDs of the most recent chats.
*
* @return {Promise<[ChatListInfo]>} - List of chat IDs in order of most recent message.
*/
async getRecentChats() {
return await this.page.$eval("mws-conversations-list .conv-container",
elem => window.__mautrixController.parseChatList(elem))
}
/**
* @typedef ChatInfo
* @type object
* @property {[Participant]} participants
*/
/**
* Get info about a chat.
*
* @param {number} id - The chat ID whose info to get.
* @return {Promise<ChatInfo>} - Info about the chat.
*/
async getChatInfo(id) {
return await this.taskQueue.push(() => this._getChatInfoUnsafe(id))
}
/**
* Send a message to a chat.
*
* @param {number} chatID - The ID of the chat to send a message to.
* @param {string} text - The text to send.
*/
async sendMessage(chatID, text) {
await this.taskQueue.push(() => this._sendMessageUnsafe(chatID, text))
}
/**
* Get messages in a chat.
*
* @param {number} id The ID of the chat whose messages to get.
* @return {Promise<[MessageData]>} - The messages visible in the chat.
*/
async getMessages(id) {
return this.taskQueue.push(() => this._getMessagesUnsafe(id))
}
async startObserving() {
this.log("Adding chat list observer")
await this.page.$eval("mws-conversations-list .conv-container",
element => window.__mautrixController.addChatListObserver(element))
}
async stopObserving() {
this.log("Removing chat list observer")
await this.page.evaluate(() => window.__mautrixController.removeChatListObserver())
}
_listItemSelector(id) {
return `mws-conversation-list-item > a.list-item[href="/web/conversations/${id}"]`
}
async _switchChatUnsafe(id) {
this.log("Switching to chat", id)
await this.page.click(this._listItemSelector(id))
}
async _getChatInfoUnsafe(id) {
await this._switchChatUnsafe(id)
await this.page.waitForSelector("mw-conversation-menu button", { timeout: 500 })
await this.page.click("mw-conversation-menu button")
await this.page.waitForSelector(".mat-menu-panel button.mat-menu-item.details",
{ timeout: 500 })
// There's a 250ms animation and I don't know how to wait for it properly
await sleep(250)
await this.page.click(".mat-menu-panel button.mat-menu-item.details")
await this.page.waitForSelector("mws-dialog mw-conversation-details .participants",
{ timeout: 500 })
const participants = await this.page.$eval(
"mws-dialog mw-conversation-details .participants",
elem => window.__mautrixController.parseParticipantList(elem))
await this.page.click("mws-dialog mat-dialog-actions button.confirm")
return {
participants,
...await this.page.$eval(this._listItemSelector(id),
elem => window.__mautrixController.parseChatListItem(elem)),
}
}
async _sendMessageUnsafe(chatID, text) {
await this._switchChatUnsafe(chatID)
await this.page.focus("mws-message-compose .input-box textarea")
await this.page.keyboard.type(text)
await this.page.click(".compose-container > mws-message-send-button > button")
}
async _getMessagesUnsafe(id, minID = 0) {
await this._switchChatUnsafe(id)
this.log("Waiting for messages to load")
await this.page.waitFor("mws-message-wrapper")
const messages = await this.page.$eval("mws-messages-list .content",
element => window.__mautrixController.parseMessageList(element))
if (minID) {
return messages.filter(message => message.id > minID)
}
return messages
}
async _processChatListChangeUnsafe(id) {
this.updatedChats.delete(id)
this.log("Processing change to", id)
const lastMsgID = this.mostRecentMessages.get(id) || 0
const messages = await this._getMessagesUnsafe(id, lastMsgID)
if (messages.length === 0) {
this.log("No new messages found in", id)
return
}
const newFirstID = messages[0].id
const newLastID = messages[messages.length - 1].id
this.mostRecentMessages.set(id, newLastID)
this.log(`Loaded messages in ${id} after ${lastMsgID}: got ${newFirstID}-${newLastID}`)
if (this.client) {
for (const message of messages) {
message.chat_id = id
await this.client.sendMessage(message).catch(err =>
this.error("Failed to send message", message.id, "to client:", err))
}
} else {
this.log("No client connected, not sending messages")
}
}
_receiveChatListChanges(changes) {
this.log("Received chat list changes:", changes)
for (const item of changes) {
if (!this.updatedChats.has(item)) {
this.updatedChats.add(item)
this.taskQueue.push(() => this._processChatListChangeUnsafe(item))
.catch(err => this.error("Error handling chat list changes:", err))
}
}
}
_receiveQRChange(url) {
if (this.client) {
this.client.sendQRCode(url).catch(err =>
this.error("Failed to send new QR to client:", err))
} else {
this.log("No client connected, not sending new QR")
}
}
}

View File

@ -14,18 +14,27 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
class TaskQueue {
constructor() {
export default class TaskQueue {
constructor(id) {
this.id = id
this._tasks = []
this.running = false
this._wakeup = null
}
log(...text) {
console.log(`[TaskQueue/${this.id}]`, ...text)
}
error(...text) {
console.error(`[TaskQueue/${this.id}]`, ...text)
}
async _run() {
console.log("Started processing tasks")
this.log("Started processing tasks")
while (this.running) {
if (this._tasks.length === 0) {
console.log("Sleeping until a new task is received")
this.log("Sleeping until a new task is received")
await new Promise(resolve => this._wakeup = () => {
resolve()
this._wakeup = null
@ -33,12 +42,12 @@ class TaskQueue {
if (!this.running) {
break
}
console.log("Continuing processing tasks")
this.log("Continuing processing tasks")
}
const { task, resolve, reject } = this._tasks.shift()
await task().then(resolve, reject)
}
console.log("Stopped processing tasks")
this.log("Stopped processing tasks")
}
/**
@ -70,7 +79,7 @@ class TaskQueue {
return
}
this.running = true
this._run().catch(err => console.error("Fatal error processing tasks:", err))
this._run().catch(err => this.error("Fatal error processing tasks:", err))
}
/**
@ -86,5 +95,3 @@ class TaskQueue {
}
}
}
module.exports = TaskQueue

43
puppet/src/util.js Normal file
View File

@ -0,0 +1,43 @@
// mautrix-amp - A very hacky Matrix-SMS bridge based on using Android Messages for Web in Puppeteer
// Copyright (C) 2020 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
export function promisify(func) {
return new Promise((resolve, reject) => {
try {
func(err => err ? reject(err) : resolve())
} catch (err) {
reject(err)
}
})
}
export function sleep(timeout) {
return new Promise(resolve => setTimeout(resolve, timeout))
}
export function emitLines(stream) {
let buffer = ""
stream.on("data", data => {
buffer += data
let n = buffer.indexOf("\n")
while (~n) {
stream.emit("line", buffer.substring(0, n))
buffer = buffer.substring(n + 1)
n = buffer.indexOf("\n")
}
})
stream.on("end", () => buffer && stream.emit("line", buffer))
}

File diff suppressed because it is too large Load Diff