From dd2c320b7530a25d4bfcd5b0dea39fa5b0a34b93 Mon Sep 17 00:00:00 2001 From: N Date: Tue, 19 Nov 2024 22:17:14 +0300 Subject: [PATCH] MA-19046: refactor ws proxy --- CHANGELOG.md | 5 ++ src/constants.ts | 2 +- src/entities/HttpProxy.ts | 24 ++++------ src/entities/Logger.ts | 13 ++++++ src/entities/TunnelClient.ts | 89 +++++++++++++++++------------------- src/entities/WsProxy.ts | 85 +++++++++++++++------------------- src/index.d.ts | 33 +++++++++++++ src/types.ts | 12 +++-- src/vk-tunnel.ts | 3 +- 9 files changed, 148 insertions(+), 118 deletions(-) create mode 100644 src/entities/Logger.ts create mode 100644 src/index.d.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 06d927b..294a175 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v0.2.4 + +- Добавили поддержку бинарных WebSocket сообщений. Теперь сервер корректно обрабатывает и пересылает как текстовые, так и бинарные данные через WebSocket. +- Добавили возможность задавать `ws-origin через` через `process.env.PROXY_WS_ORIGIN`. + ## v0.2.3 - Добавили поддержку `hot-reload`. diff --git a/src/constants.ts b/src/constants.ts index c487cc0..a62ba3e 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -11,7 +11,7 @@ export const DEFAULT_USER_PROXY_APP_SETTINGS = { 'port': Number(process.env.PROXY_PORT ?? 10888), 'host': process.env.PROXY_HOST ?? 'localhost', 'insecure': 0, - 'ws-origin': 1, + 'ws-origin': Number(process.env.PROXY_WS_ORIGIN ?? 1), 'app_id': undefined, 'staging': undefined, 'endpoints': undefined, diff --git a/src/entities/HttpProxy.ts b/src/entities/HttpProxy.ts index 7dd4bbe..d9c122d 100644 --- a/src/entities/HttpProxy.ts +++ b/src/entities/HttpProxy.ts @@ -1,15 +1,9 @@ import axios, { AxiosResponse, Method } from 'axios'; -import { Logger } from 'pino'; +import { logger } from './Logger'; import { ProxiedNetworkPacket, SendResponseToProxyServer, UserProxyAppSettings } from '../types'; export class HttpProxy { - private userSettings: UserProxyAppSettings; - private logger: Logger; - - public constructor(userSettings: UserProxyAppSettings, logger: Logger) { - this.userSettings = userSettings; - this.logger = logger; - } + public constructor(private readonly userSettings: UserProxyAppSettings) {} private async getResponseFromProxiedServer( proxiedServerUrl: string, @@ -28,7 +22,7 @@ export class HttpProxy { }).catch((error) => console.log(error)); } - private generateRawResponseForVkProxyServer(proxiedServerResponse: AxiosResponse) { + private generateHeadersForVkTunnelBack(proxiedServerResponse: AxiosResponse) { let rawResponse = `HTTP/1.1 ${proxiedServerResponse.status} ${proxiedServerResponse.statusText}\r\n`; let keys = Object.keys(proxiedServerResponse.headers); @@ -54,7 +48,7 @@ export class HttpProxy { public async proxy( packetData: ProxiedNetworkPacket, - sendResponseToVkProxyServer: SendResponseToProxyServer, + sendResponseToVkTunnelBack: SendResponseToProxyServer, ) { const { seq, parsedRequest, messageType, endpoint } = packetData; const proxiedServerUrl = `${this.userSettings.httpProtocol}://${this.userSettings.host}:${this.userSettings.port}${parsedRequest.uri}`; @@ -68,14 +62,14 @@ export class HttpProxy { const buffer = Buffer.concat([ Buffer.from(seq, 'utf8'), Buffer.from(messageType, 'utf8'), - Buffer.from(this.generateRawResponseForVkProxyServer(response)), + Buffer.from(this.generateHeadersForVkTunnelBack(response)), response.data, ]); - sendResponseToVkProxyServer(buffer, () => { - this.logger.debug( + sendResponseToVkTunnelBack(buffer, () => { + logger.debug( 'REQUEST', - `seq: {$seq}`, + `seq: ${seq}`, `type: ${messageType.charCodeAt(0)}`, `endpoint: ${endpoint}`, ); @@ -86,7 +80,7 @@ export class HttpProxy { const uri = parsedRequest['uri'] || '-'; const ua = parsedRequest['headers']['User-Agent'] || '-'; const length = response.data.length; - this.logger.info(`${realIp} ${statusCode} ${host} ${method} ${uri} ${ua} ${length}`); + logger.info(`${realIp} ${statusCode} ${host} ${method} ${uri} ${ua} ${length}`); }); } } diff --git a/src/entities/Logger.ts b/src/entities/Logger.ts new file mode 100644 index 0000000..380f3d2 --- /dev/null +++ b/src/entities/Logger.ts @@ -0,0 +1,13 @@ +import pino from 'pino'; +import pinoPretty from 'pino-pretty'; + +export const logger = pino( + { + level: 'info', + base: {}, + timestamp: pino.stdTimeFunctions.isoTime, + }, + pinoPretty({ + colorize: true, + }), +); diff --git a/src/entities/TunnelClient.ts b/src/entities/TunnelClient.ts index c48e7cc..053429e 100644 --- a/src/entities/TunnelClient.ts +++ b/src/entities/TunnelClient.ts @@ -1,54 +1,37 @@ import { RawData, WebSocket } from 'ws'; import chalk from 'chalk'; import httpParser from 'http-string-parser'; -import pino, { Logger } from 'pino'; -import pinoPretty from 'pino-pretty'; import { WsProxy } from './WsProxy'; import { HttpProxy } from './HttpProxy'; +import { logger } from './Logger'; import { showSuccessLog } from '../helpers'; import { - MessageType, + MessageTypeFromBack, ProxiedNetworkPacket, TunnelConnectionData, UserProxyAppSettings, } from '../types'; export class TunnelClient { - private SEQ_BEGIN = 0; - private SEQ_END = 8; - private MSG_TYPE_BEGIN = 8; - private MSG_TYPE_END = 9; - private PAYLOAD_BEGIN = 9; - private DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0'; - private ACCEPT_ENCODING = 'Accept-Encoding'; - - private userSettings: UserProxyAppSettings; - private tunnelData: TunnelConnectionData; + private readonly SEQ_BEGIN = 0; + private readonly SEQ_END = 8; + private readonly MSG_TYPE_BEGIN = 8; + private readonly MSG_TYPE_END = 9; + private readonly PAYLOAD_BEGIN = 9; + + private readonly DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0'; + private readonly ACCEPT_ENCODING = 'Accept-Encoding'; + private HttpProxy: HttpProxy; private WsProxy: WsProxy; - private socket: WebSocket; - private logger: Logger; public constructor( - socket: WebSocket, - userSettings: UserProxyAppSettings, - tunnelData: TunnelConnectionData, + private readonly socket: WebSocket, + private readonly tunnelData: TunnelConnectionData, + private readonly userSettings: UserProxyAppSettings, ) { - this.userSettings = userSettings; - this.tunnelData = tunnelData; - this.socket = socket; - this.logger = pino( - { - level: 'info', - base: {}, - timestamp: pino.stdTimeFunctions.isoTime, - }, - pinoPretty({ - colorize: true, - }), - ); - this.HttpProxy = new HttpProxy(userSettings, this.logger); - this.WsProxy = new WsProxy(userSettings, this.logger); + this.HttpProxy = new HttpProxy(userSettings); + this.WsProxy = new WsProxy(userSettings); process.env.NODE_TLS_REJECT_UNAUTHORIZED = this.userSettings.insecure.toString(); } @@ -61,43 +44,53 @@ export class TunnelClient { } public onConnectionClose(code: string) { - this.logger.info('disconnected, code:', code); + logger.info('disconnected, code:', code); } public onConnectionError(error: string) { - this.logger.error('wsMain error', error); + logger.error('wsMain error', error); } - private sendResponseToVkProxyServer(data: Buffer | string, callback?: (error?: Error) => void) { - this.socket.send(data, callback ? callback : undefined); + private sendResponseToVkTunnelBack(data: Buffer | string, callback?: (error?: Error) => void) { + this.socket.send(data, callback); } - private parseProxyRequest(query: string): ProxiedNetworkPacket { - const seq = query.slice(this.SEQ_BEGIN, this.SEQ_END); - const messageType = query.slice(this.MSG_TYPE_BEGIN, this.MSG_TYPE_END) as MessageType; - const payload = query.slice(this.PAYLOAD_BEGIN); - const endpoint = payload.split(' ')[1]; + private transformPayload(payload: Buffer[] | ArrayBuffer) { + return payload + .toString() + .replace(/Accept-Encoding:.*/, this.ACCEPT_ENCODING + ': ' + this.DISABLE_COMPRESS) + .replace(/Host: .*/, 'Host: ' + this.userSettings.host); + } - payload.split('\r'); + private parseProxyRequest(data: RawData): ProxiedNetworkPacket { + const seq = data.slice(this.SEQ_BEGIN, this.SEQ_END).toString(); + const rawPayload = data.slice(this.PAYLOAD_BEGIN); + const messageType = data + .slice(this.MSG_TYPE_BEGIN, this.MSG_TYPE_END) + .toString() as MessageTypeFromBack; + + const isWebSocketBinary = messageType === MessageTypeFromBack.WEBSOCKET_BINARY; + const payload = isWebSocketBinary ? rawPayload : this.transformPayload(rawPayload); const parsedRequest = httpParser.parseRequest(payload.toString()); const upgradeHeader = parsedRequest.headers['Upgrade'] || ''; const isWebsocketUpgrade = upgradeHeader.toLowerCase() === 'websocket'; + const endpoint = payload.toString().split(' ')[1]; + return { seq, endpoint, messageType, isWebsocketUpgrade, parsedRequest, payload }; } public async onMessage(data: RawData) { - const query = data.toString(); - const packetData = this.parseProxyRequest(query); + const packetData = this.parseProxyRequest(data); packetData.parsedRequest.headers['Host'] = this.userSettings.host; packetData.parsedRequest.headers[this.ACCEPT_ENCODING] = this.DISABLE_COMPRESS; - if (packetData.messageType === MessageType.HTTP && !packetData.isWebsocketUpgrade) { - this.HttpProxy.proxy(packetData, this.sendResponseToVkProxyServer.bind(this)); + if (packetData.messageType === MessageTypeFromBack.HTTP && !packetData.isWebsocketUpgrade) { + this.HttpProxy.proxy(packetData, this.sendResponseToVkTunnelBack.bind(this)); } else { - this.WsProxy.proxy(packetData, this.sendResponseToVkProxyServer.bind(this)); + this.WsProxy.proxy(packetData, this.sendResponseToVkTunnelBack.bind(this)); } } } diff --git a/src/entities/WsProxy.ts b/src/entities/WsProxy.ts index 63e6966..a1fce40 100644 --- a/src/entities/WsProxy.ts +++ b/src/entities/WsProxy.ts @@ -1,28 +1,17 @@ import WebSocket from 'ws'; -import { Logger } from 'pino'; +import { logger } from './Logger'; import { - MessageType, + MessageTypeToSend, + MessageTypeFromBack, ProxiedNetworkPacket, SendResponseToProxyServer, UserProxyAppSettings, } from '../types'; export class WsProxy { - private static ACCEPT_ENCODING = 'Accept-Encoding'; - private static DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0'; + private connections: Map = new Map(); - private connections: Record = {}; - - public constructor( - private readonly userSettings: UserProxyAppSettings, - private readonly logger: Logger, - ) {} - - private transformPayload(payload: string, proxyHost: string) { - return payload - .replace(/Accept-Encoding:.*/, WsProxy.ACCEPT_ENCODING + ': ' + WsProxy.DISABLE_COMPRESS) - .replace(/Host: .*/, 'Host: ' + proxyHost); - } + public constructor(private readonly userSettings: UserProxyAppSettings) {} private filterWebSocketHeaders(headers: Record) { const allowedHeaders = [ @@ -31,40 +20,47 @@ export class WsProxy { 'Sec-WebSocket-Key', 'Sec-WebSocket-Version', ]; + return Object.fromEntries( Object.entries(headers).filter(([key]) => allowedHeaders.includes(key)), ); } private closeConnection(seq: string) { - this.connections[seq].close(); + this.connections.get(seq)?.close(); } private createConnection( seq: string, - proxiedServerUrl: string, + endpoint: string, headers: Record, sendResponseToVkProxyServer: SendResponseToProxyServer, ) { const subprotocol = headers['Sec-Websocket-Protocol']; - const host = this.userSettings.wsOrigin ? this.userSettings.host : undefined; - const origin = this.userSettings.wsOrigin - ? `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}` - : undefined; + const proxiedServerOrigin = `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}`; + const proxiedServerUrl = `${proxiedServerOrigin}${endpoint}`; const websocket = new WebSocket(proxiedServerUrl, subprotocol, { - host, - origin, + host: this.userSettings.wsOrigin ? this.userSettings.host : undefined, + origin: this.userSettings.wsOrigin ? proxiedServerOrigin : undefined, headers: this.filterWebSocketHeaders(headers), }); - websocket.on('error', (msg) => this.logger.error('Connection error for ' + seq, msg)); + websocket.on('error', (msg) => logger.error('Connection error for ' + seq, msg)); websocket.on('open', () => { - this.connections[seq].on('message', (data) => { - this.logger.debug('incoming ws message from service', seq, data); - sendResponseToVkProxyServer(`${seq}${MessageType.WEBSOCKET}${data}`, () => { - this.logger.debug('send reply', seq, data); + websocket.on('message', (data, isBinary) => { + logger.debug('incoming ws message from service', seq, data, isBinary); + + const dataBuf = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); + const seqBuf = Buffer.from(seq, 'utf8'); + const typeBuf = Buffer.from(MessageTypeToSend.WEBSOCKET, 'utf8'); + const stringMessage = `${seq}${MessageTypeToSend.WEBSOCKET}${data}`; + + const finalMessage = isBinary ? Buffer.concat([seqBuf, typeBuf, dataBuf]) : stringMessage; + + sendResponseToVkProxyServer(finalMessage, () => { + logger.debug('send reply', seq, data, isBinary); }); }); }); @@ -76,41 +72,32 @@ export class WsProxy { ]; const response = responseHeaders.join('\n') + '\n\n'; - sendResponseToVkProxyServer(seq + MessageType.HTTP + response, () => { - this.logger.debug('send reply upgrade', seq, response.toString()); + sendResponseToVkProxyServer(`${seq}${MessageTypeToSend.HTTP}${response}`, () => { + logger.debug('send reply upgrade', seq, response.toString()); }); }); - this.connections[seq] = websocket; + this.connections.set(seq, websocket); } public async proxy( request: ProxiedNetworkPacket, - sendResponseToVkProxyServer: SendResponseToProxyServer, + sendResponseToVkTunnelBack: SendResponseToProxyServer, ) { const { messageType, payload, isWebsocketUpgrade, seq, endpoint, parsedRequest } = request; - if (messageType !== MessageType.HTTP) { - const filteredPayload = this.transformPayload(payload, this.userSettings.host); - - if (messageType === MessageType.WEBSOCKET_CLOSE) { - return this.closeConnection(seq); - } + if (messageType === MessageTypeFromBack.WEBSOCKET_CLOSE) { + return this.closeConnection(seq); + } - this.connections[seq].send(filteredPayload, {}, () => { - this.logger.debug('WS REQUEST', 'seq: ' + seq, messageType, endpoint, filteredPayload); + if (messageType !== MessageTypeFromBack.HTTP) { + this.connections.get(seq)?.send(payload, {}, () => { + logger.debug('WS REQUEST', 'seq: ' + seq, messageType, endpoint, payload); }); } if (isWebsocketUpgrade) { - const proxiedServerUrl = `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}${endpoint}`; - - this.createConnection( - seq, - proxiedServerUrl, - parsedRequest.headers, - sendResponseToVkProxyServer, - ); + this.createConnection(seq, endpoint, parsedRequest.headers, sendResponseToVkTunnelBack); } } } diff --git a/src/index.d.ts b/src/index.d.ts new file mode 100644 index 0000000..660a8fc --- /dev/null +++ b/src/index.d.ts @@ -0,0 +1,33 @@ +/** + * Переопределение типизации метода `Buffer.concat` для исправления ошибки в "@types/node". + * + * @remarks + * Типизация в `@types/node` неверно ограничивает первый параметр метода `Buffer.concat`, + * запрещая использовать массивы `Buffer[]`. Однако, это противоречит официальной + * документации Node.js, где указано, что массив `Buffer[]` поддерживается. + * + * @see {@link https://nodejs.org/docs/latest-v20.x/api/buffer.html#static-method-bufferconcatlist-totallength | Node.js Documentation: Buffer.concat} + */ +declare module 'buffer' { + global { + interface BufferConstructor { + concat(list: readonly Uint8Array[] | Buffer[], totalLength?: number): Buffer; + /** + * Copies the underlying memory of `view` into a new `Buffer`. + * + * ```js + * const u16 = new Uint16Array([0, 0xffff]); + * const buf = Buffer.copyBytesFrom(u16, 1, 1); + * u16[1] = 0; + * console.log(buf.length); // 2 + * console.log(buf[0]); // 255 + * console.log(buf[1]); // 255 + * ``` + * @since v19.8.0 + * @param view The {TypedArray} to copy. + * @param [offset=0] The starting offset within `view`. + * @param [length=view.length - offset] The number of elements from `view` to copy. + */ + } + } +} diff --git a/src/types.ts b/src/types.ts index f0f0c0e..9751ee8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -10,9 +10,15 @@ export enum WsProtocol { WSS = 'wss', } -export enum MessageType { +export enum MessageTypeToSend { HTTP = '\0', WEBSOCKET = '\x01', +} + +export enum MessageTypeFromBack { + HTTP = '\0', + WEBSOCKET_BINARY = '\x02', + WEBSOCKET_TEXT = '\x01', WEBSOCKET_CLOSE = '\b', } @@ -51,9 +57,9 @@ export interface UserData { export interface ProxiedNetworkPacket { seq: string; - payload: string; + payload: ArrayBuffer | Buffer[] | string; endpoint: string; - messageType: MessageType; + messageType: MessageTypeFromBack; isWebsocketUpgrade: boolean; parsedRequest: ParseRequestResult; } diff --git a/src/vk-tunnel.ts b/src/vk-tunnel.ts index 49c8a07..7e7b5d8 100644 --- a/src/vk-tunnel.ts +++ b/src/vk-tunnel.ts @@ -1,5 +1,4 @@ #!/usr/bin/env node - import WebsocketClient from 'ws'; import { getTunnelConnectionData, getUserProxyAppSettings } from './helpers'; import { TunnelClient } from './entities'; @@ -14,7 +13,7 @@ async function vkTunnel() { headers: { UserID: String(tunnelData.userId), Token: tunnelData.tunnelToken }, }); - const tunnelClient = new TunnelClient(socket, userSettings, tunnelData); + const tunnelClient = new TunnelClient(socket, tunnelData, userSettings); socket.on('open', () => tunnelClient.onConnectionOpen()); socket.on('close', (code: string) => tunnelClient.onConnectionClose(code));