Separate the logging #27

Open
lecris wants to merge 6 commits from lecris/matrix-puppeteer-line:logger into testing
4 changed files with 56 additions and 66 deletions
Showing only changes of commit 3fdb6cc8d3 - Show all commits

View File

@ -16,7 +16,6 @@
import net from "net" import net from "net"
import fs from "fs" import fs from "fs"
import path from "path" import path from "path"
import logger from "loglevel"
import Client from "./client.js" import Client from "./client.js"
import { promisify } from "./util.js" import { promisify } from "./util.js"
@ -30,10 +29,7 @@ export default class PuppetAPI {
this.clients = new Map() this.clients = new Map()
this.connIDSequence = 0 this.connIDSequence = 0
this.stopped = false this.stopped = false
} this.log = require("loglevel").getLogger("API")
log(...text) {
console.log("[API]", ...text)
} }
acceptConnection = sock => { acceptConnection = sock => {
@ -58,16 +54,16 @@ export default class PuppetAPI {
} catch (err) {} } catch (err) {}
await promisify(cb => this.server.listen(socketPath, cb)) await promisify(cb => this.server.listen(socketPath, cb))
await fs.promises.chmod(socketPath, 0o700) await fs.promises.chmod(socketPath, 0o700)
logger.info("Now listening at", socketPath) this.log.info("Now listening at", socketPath)
} }
async startTCP(port, host) { async startTCP(port, host) {
await promisify(cb => this.server.listen(port, host, cb)) await promisify(cb => this.server.listen(port, host, cb))
logger.info(`Now listening at ${host || ""}:${port}`) this.log.info(`Now listening at ${host || ""}:${port}`)
} }
async start() { async start() {
this.log("Starting server") this.log.info("Starting server")
if (this.listenConfig.type === "unix") { if (this.listenConfig.type === "unix") {
await this.startUnix(this.listenConfig.path) await this.startUnix(this.listenConfig.path)
@ -85,14 +81,14 @@ export default class PuppetAPI {
socket.end() socket.end()
socket.destroy() socket.destroy()
} }
this.log("Stopping server") this.log.info("Stopping server")
await promisify(cb => this.server.close(cb)) await promisify(cb => this.server.close(cb))
if (this.listenConfig.type === "unix") { if (this.listenConfig.type === "unix") {
try { try {
await fs.promises.unlink(this.listenConfig.path) await fs.promises.unlink(this.listenConfig.path)
} catch (err) {} } catch (err) {}
} }
this.log("Stopping puppets") this.log.info("Stopping puppets")
for (const puppet of this.puppets.values()) { for (const puppet of this.puppets.values()) {
await puppet.stop() await puppet.stop()
} }

View File

@ -15,6 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
import MessagesPuppeteer from "./puppet.js" import MessagesPuppeteer from "./puppet.js"
import {emitLines, promisify} from "./util.js" import {emitLines, promisify} from "./util.js"
import logger from "loglevel";
export default class Client { export default class Client {
/** /**
@ -28,31 +29,27 @@ export default class Client {
this.manager = manager this.manager = manager
this.socket = socket this.socket = socket
this.connID = connID this.connID = connID
this.userID = userID
this.puppet = puppet this.puppet = puppet
this.stopped = false this.stopped = false
this.notificationID = 0 this.notificationID = 0
this.maxCommandID = 0 this.maxCommandID = 0
} this.set_userID(userID)
if (!this.userID) {
log(...text) { this.log = require("loglevel").getLogger(`API/${this.connID}`)
if (this.userID) { this.log.setLevel(logger.getLogger("API").getLevel())
console.log(`[API/${this.userID}/${this.connID}]`, ...text)
} else {
console.log(`[API/${this.connID}]`, ...text)
} }
} }
error(...text) { set_userID(ID) {
this.userID = ID
if (this.userID) { if (this.userID) {
console.error(`[API/${this.userID}/${this.connID}]`, ...text) this.log = require("loglevel").getLogger(`API/${this.userID}/${this.connID}`)
} else { this.log.setLevel(logger.getLogger(`API/${this.connID}`).getLevel())
console.error(`[API/${this.connID}]`, ...text)
} }
} }
start() { start() {
this.log("Received connection", this.connID) this.log.info("Received connection", this.connID)
emitLines(this.socket) emitLines(this.socket)
this.socket.on("line", line => this.handleLine(line) this.socket.on("line", line => this.handleLine(line)
.catch(err => this.log("Error handling line:", err))) .catch(err => this.log("Error handling line:", err)))
@ -60,7 +57,7 @@ export default class Client {
setTimeout(() => { setTimeout(() => {
if (!this.userID && !this.stopped) { if (!this.userID && !this.stopped) {
this.log("Didn't receive register request within 3 seconds, terminating") this.log.warn("Didn't receive register request within 3 seconds, terminating")
this.stop("Register request timeout") this.stop("Register request timeout")
} }
}, 3000) }, 3000)
@ -75,7 +72,7 @@ export default class Client {
await this._write({id: --this.notificationID, command: "quit", error}) await this._write({id: --this.notificationID, command: "quit", error})
await promisify(cb => this.socket.end(cb)) await promisify(cb => this.socket.end(cb))
} catch (err) { } catch (err) {
this.error("Failed to end connection:", err) this.log.error("Failed to end connection:", err)
this.socket.destroy(err) this.socket.destroy(err)
} }
} }
@ -85,7 +82,7 @@ export default class Client {
if (this.userID && this.manager.clients.get(this.userID) === this) { if (this.userID && this.manager.clients.get(this.userID) === this) {
this.manager.clients.delete(this.userID) this.manager.clients.delete(this.userID)
} }
this.log(`Connection closed (user: ${this.userID})`) this.log.info(`Connection closed (user: ${this.userID})`)
} }
/** /**
@ -99,7 +96,7 @@ export default class Client {
} }
sendMessage(message) { sendMessage(message) {
this.log(`Sending message ${message.id || "with no ID"} to client`) this.log.debug(`Sending message ${message.id || "with no ID"} to client`)
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "message", command: "message",
@ -109,7 +106,7 @@ export default class Client {
} }
sendReceipt(receipt) { sendReceipt(receipt) {
this.log(`Sending read receipt (${receipt.count || "DM"}) of msg ${receipt.id} for chat ${receipt.chat_id}`) this.log.debug(`Sending read receipt (${receipt.count || "DM"}) of msg ${receipt.id} for chat ${receipt.chat_id}`)
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "receipt", command: "receipt",
@ -118,7 +115,7 @@ export default class Client {
} }
sendQRCode(url) { sendQRCode(url) {
this.log(`Sending QR ${url} to client`) this.log.debug(`Sending QR ${url} to client`)
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "qr", command: "qr",
@ -127,7 +124,7 @@ export default class Client {
} }
sendPIN(pin) { sendPIN(pin) {
this.log(`Sending PIN ${pin} to client`) this.log.debug(`Sending PIN ${pin} to client`)
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "pin", command: "pin",
@ -136,7 +133,7 @@ export default class Client {
} }
sendLoginSuccess() { sendLoginSuccess() {
this.log("Sending login success to client") this.log.debug("Sending login success to client")
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "login_success", command: "login_success",
@ -144,7 +141,7 @@ export default class Client {
} }
sendLoginFailure(reason) { sendLoginFailure(reason) {
this.log(`Sending login failure to client${reason ? `: "${reason}"` : ""}`) this.log.debug(`Sending login failure to client${reason ? `: "${reason}"` : ""}`)
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "login_failure", command: "login_failure",
@ -153,7 +150,7 @@ export default class Client {
} }
sendLoggedOut() { sendLoggedOut() {
this.log("Sending logout notice to client") this.log.debug("Sending logout notice to client")
return this._write({ return this._write({
id: --this.notificationID, id: --this.notificationID,
command: "logged_out", command: "logged_out",
@ -163,7 +160,7 @@ export default class Client {
handleStart = async (req) => { handleStart = async (req) => {
let started = false let started = false
if (this.puppet === null) { if (this.puppet === null) {
this.log("Opening new puppeteer for", this.userID) this.log.info("Opening new puppeteer for", this.userID)
this.puppet = new MessagesPuppeteer(this.userID, this.ownID, this.sendPlaceholders, this) this.puppet = new MessagesPuppeteer(this.userID, this.ownID, this.sendPlaceholders, this)
this.manager.puppets.set(this.userID, this.puppet) this.manager.puppets.set(this.userID, this.puppet)
await this.puppet.start(!!req.debug) await this.puppet.start(!!req.debug)
@ -181,7 +178,7 @@ export default class Client {
if (this.puppet === null) { if (this.puppet === null) {
return {stopped: false} return {stopped: false}
} }
this.log("Closing puppeteer for", this.userID) this.log.info("Closing puppeteer for", this.userID)
this.manager.puppets.delete(this.userID) this.manager.puppets.delete(this.userID)
await this.puppet.stop() await this.puppet.stop()
this.puppet = null this.puppet = null
@ -193,14 +190,14 @@ export default class Client {
} }
handleRegister = async (req) => { handleRegister = async (req) => {
this.userID = req.user_id this.set_userID(req.user_id)
this.ownID = req.own_id this.ownID = req.own_id
this.sendPlaceholders = req.ephemeral_events this.sendPlaceholders = req.ephemeral_events
this.log(`Registered socket ${this.connID} -> ${this.userID}${!this.sendPlaceholders ? "" : " (with placeholder message support)"}`) this.log.info(`Registered socket ${this.connID} -> ${this.userID}${!this.sendPlaceholders ? "" : " (with placeholder message support)"}`)
if (this.manager.clients.has(this.userID)) { if (this.manager.clients.has(this.userID)) {
const oldClient = this.manager.clients.get(this.userID) const oldClient = this.manager.clients.get(this.userID)
this.manager.clients.set(this.userID, this) this.manager.clients.set(this.userID, this)
this.log(`Terminating previous socket ${oldClient.connID} for ${this.userID}`) this.log.info(`Terminating previous socket ${oldClient.connID} for ${this.userID}`)
await oldClient.stop("Socket replaced by new connection") await oldClient.stop("Socket replaced by new connection")
} else { } else {
this.manager.clients.set(this.userID, this) this.manager.clients.set(this.userID, this)
@ -214,36 +211,36 @@ export default class Client {
async handleLine(line) { async handleLine(line) {
if (this.stopped) { if (this.stopped) {
this.log("Ignoring line, client is stopped") this.log.info("Ignoring line, client is stopped")
return return
} }
let req let req
try { try {
req = JSON.parse(line) req = JSON.parse(line)
} catch (err) { } catch (err) {
this.log("Non-JSON request:", line) this.log.error("Non-JSON request:", line)
return return
} }
if (!req.command || !req.id) { if (!req.command || !req.id) {
this.log("Invalid request:", line) this.log.error("Invalid request:", line)
return return
} }
if (req.id <= this.maxCommandID) { if (req.id <= this.maxCommandID) {
this.log("Ignoring old request", req.id) this.log.warn("Ignoring old request", req.id)
return return
} }
if (req.command != "is_connected") { if (req.command != "is_connected") {
this.log("Received request", req.id, "with command", req.command) this.log.info("Received request", req.id, "with command", req.command)
} }
this.maxCommandID = req.id this.maxCommandID = req.id
let handler let handler
if (!this.userID) { if (!this.userID) {
if (req.command !== "register") { if (req.command !== "register") {
this.log("First request wasn't a register request, terminating") this.log.info("First request wasn't a register request, terminating")
await this.stop("Invalid first request") await this.stop("Invalid first request")
return return
} else if (!req.user_id) { } else if (!req.user_id) {
this.log("Register request didn't contain user ID, terminating") this.log.info("Register request didn't contain user ID, terminating")
await this.stop("Invalid register request") await this.stop("Invalid register request")
return return
} }
@ -277,7 +274,7 @@ export default class Client {
} catch (err) { } catch (err) {
resp.command = "error" resp.command = "error"
resp.error = err.toString() resp.error = err.toString()
this.log("Error handling request", req.id, err) this.log.error("Error handling request", req.id, err)
} }
await this._write(resp) await this._write(resp)
} }

View File

@ -50,6 +50,8 @@ logger.setLevel(clog ? clog.default_level || 3 : 3)
logger.getLogger("Puppeteer").setLevel(clogpup ? clogpup.level || logger.getLevel() : logger.getLevel()) logger.getLogger("Puppeteer").setLevel(clogpup ? clogpup.level || logger.getLevel() : logger.getLevel())
logger.getLogger("Puppeteer_details").setLevel(clogpup ? clogpup.details || 3 : 3) logger.getLogger("Puppeteer_details").setLevel(clogpup ? clogpup.details || 3 : 3)
logger.getLogger("Puppeteer_spammer").setLevel(clogpup ? clogpup.spammer || 3 : 3) logger.getLogger("Puppeteer_spammer").setLevel(clogpup ? clogpup.spammer || 3 : 3)
logger.getLogger("TaskQueue").setLevel(clog ? clog.task_queue || 3 : 3)
logger.getLogger("API").setLevel(clog ? clog.api || 3 : 3)
// Register and specify the logger format. Others will inherit // Register and specify the logger format. Others will inherit
loggerprefix.reg(logger) loggerprefix.reg(logger)
loggerprefix.apply(logger, {template: '[%n] %l:'}); loggerprefix.apply(logger, {template: '[%n] %l:'});

View File

@ -13,6 +13,7 @@
// //
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
import logger from "loglevel"
export default class TaskQueue { export default class TaskQueue {
constructor(id) { constructor(id) {
@ -20,21 +21,15 @@ export default class TaskQueue {
this._tasks = [] this._tasks = []
this.running = false this.running = false
this._wakeup = null this._wakeup = null
} this.log = logger.getLogger(`TaskQueue/${this.id}`)
this.log.setLevel(logger.getLogger("TaskQueue").getLevel())
log(...text) {
console.log(`[TaskQueue/${this.id}]`, ...text)
}
error(...text) {
console.error(`[TaskQueue/${this.id}]`, ...text)
} }
async _run() { async _run() {
this.log("Started processing tasks") this.log.info("Started processing tasks")
while (this.running) { while (this.running) {
if (this._tasks.length === 0) { if (this._tasks.length === 0) {
this.log("Sleeping until a new task is received") this.log.debug("Sleeping until a new task is received")
await new Promise(resolve => this._wakeup = () => { await new Promise(resolve => this._wakeup = () => {
resolve() resolve()
this._wakeup = null this._wakeup = null
@ -42,12 +37,12 @@ export default class TaskQueue {
if (!this.running) { if (!this.running) {
break break
} }
this.log("Continuing processing tasks") this.log.debug("Continuing processing tasks")
} }
const { task, resolve, reject } = this._tasks.shift() const { task, resolve, reject } = this._tasks.shift()
await task().then(resolve, reject) await task().then(resolve, reject)
} }
this.log("Stopped processing tasks") this.log.info("Stopped processing tasks")
} }
/** /**
@ -79,7 +74,7 @@ export default class TaskQueue {
return return
} }
this.running = true this.running = true
this._run().catch(err => this.error("Fatal error processing tasks:", err)) this._run().catch(err => this.log.error("Fatal error processing tasks:", err))
} }
/** /**