Skip to content

Commit

Permalink
MA-19046: refactor ws proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-nikitin-2022 committed Nov 25, 2024
1 parent fadffec commit dd2c320
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 118 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## v0.2.4

- Добавили поддержку бинарных WebSocket сообщений. Теперь сервер корректно обрабатывает и пересылает как текстовые, так и бинарные данные через WebSocket.
- Добавили возможность задавать `ws-origin через` через `process.env.PROXY_WS_ORIGIN`.

## v0.2.3

- Добавили поддержку `hot-reload`.
Expand Down
2 changes: 1 addition & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const DEFAULT_USER_PROXY_APP_SETTINGS = {
'port': Number(process.env.PROXY_PORT ?? 10888),
'host': process.env.PROXY_HOST ?? 'localhost',
'insecure': 0,
'ws-origin': 1,
'ws-origin': Number(process.env.PROXY_WS_ORIGIN ?? 1),
'app_id': undefined,
'staging': undefined,
'endpoints': undefined,
Expand Down
24 changes: 9 additions & 15 deletions src/entities/HttpProxy.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import axios, { AxiosResponse, Method } from 'axios';
import { Logger } from 'pino';
import { logger } from './Logger';
import { ProxiedNetworkPacket, SendResponseToProxyServer, UserProxyAppSettings } from '../types';

export class HttpProxy {
private userSettings: UserProxyAppSettings;
private logger: Logger;

public constructor(userSettings: UserProxyAppSettings, logger: Logger) {
this.userSettings = userSettings;
this.logger = logger;
}
public constructor(private readonly userSettings: UserProxyAppSettings) {}

private async getResponseFromProxiedServer(
proxiedServerUrl: string,
Expand All @@ -28,7 +22,7 @@ export class HttpProxy {
}).catch((error) => console.log(error));
}

private generateRawResponseForVkProxyServer(proxiedServerResponse: AxiosResponse) {
private generateHeadersForVkTunnelBack(proxiedServerResponse: AxiosResponse) {
let rawResponse = `HTTP/1.1 ${proxiedServerResponse.status} ${proxiedServerResponse.statusText}\r\n`;
let keys = Object.keys(proxiedServerResponse.headers);

Expand All @@ -54,7 +48,7 @@ export class HttpProxy {

public async proxy(
packetData: ProxiedNetworkPacket,
sendResponseToVkProxyServer: SendResponseToProxyServer,
sendResponseToVkTunnelBack: SendResponseToProxyServer,
) {
const { seq, parsedRequest, messageType, endpoint } = packetData;
const proxiedServerUrl = `${this.userSettings.httpProtocol}://${this.userSettings.host}:${this.userSettings.port}${parsedRequest.uri}`;
Expand All @@ -68,14 +62,14 @@ export class HttpProxy {
const buffer = Buffer.concat([
Buffer.from(seq, 'utf8'),
Buffer.from(messageType, 'utf8'),
Buffer.from(this.generateRawResponseForVkProxyServer(response)),
Buffer.from(this.generateHeadersForVkTunnelBack(response)),
response.data,
]);

sendResponseToVkProxyServer(buffer, () => {
this.logger.debug(
sendResponseToVkTunnelBack(buffer, () => {
logger.debug(
'REQUEST',
`seq: {$seq}`,
`seq: ${seq}`,
`type: ${messageType.charCodeAt(0)}`,
`endpoint: ${endpoint}`,
);
Expand All @@ -86,7 +80,7 @@ export class HttpProxy {
const uri = parsedRequest['uri'] || '-';
const ua = parsedRequest['headers']['User-Agent'] || '-';
const length = response.data.length;
this.logger.info(`${realIp} ${statusCode} ${host} ${method} ${uri} ${ua} ${length}`);
logger.info(`${realIp} ${statusCode} ${host} ${method} ${uri} ${ua} ${length}`);
});
}
}
13 changes: 13 additions & 0 deletions src/entities/Logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pino from 'pino';
import pinoPretty from 'pino-pretty';

export const logger = pino(
{
level: 'info',
base: {},
timestamp: pino.stdTimeFunctions.isoTime,
},
pinoPretty({
colorize: true,
}),
);
89 changes: 41 additions & 48 deletions src/entities/TunnelClient.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,37 @@
import { RawData, WebSocket } from 'ws';
import chalk from 'chalk';
import httpParser from 'http-string-parser';
import pino, { Logger } from 'pino';
import pinoPretty from 'pino-pretty';
import { WsProxy } from './WsProxy';
import { HttpProxy } from './HttpProxy';
import { logger } from './Logger';
import { showSuccessLog } from '../helpers';
import {
MessageType,
MessageTypeFromBack,
ProxiedNetworkPacket,
TunnelConnectionData,
UserProxyAppSettings,
} from '../types';

export class TunnelClient {
private SEQ_BEGIN = 0;
private SEQ_END = 8;
private MSG_TYPE_BEGIN = 8;
private MSG_TYPE_END = 9;
private PAYLOAD_BEGIN = 9;
private DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0';
private ACCEPT_ENCODING = 'Accept-Encoding';

private userSettings: UserProxyAppSettings;
private tunnelData: TunnelConnectionData;
private readonly SEQ_BEGIN = 0;
private readonly SEQ_END = 8;
private readonly MSG_TYPE_BEGIN = 8;
private readonly MSG_TYPE_END = 9;
private readonly PAYLOAD_BEGIN = 9;

private readonly DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0';
private readonly ACCEPT_ENCODING = 'Accept-Encoding';

private HttpProxy: HttpProxy;
private WsProxy: WsProxy;
private socket: WebSocket;
private logger: Logger;

public constructor(
socket: WebSocket,
userSettings: UserProxyAppSettings,
tunnelData: TunnelConnectionData,
private readonly socket: WebSocket,
private readonly tunnelData: TunnelConnectionData,
private readonly userSettings: UserProxyAppSettings,
) {
this.userSettings = userSettings;
this.tunnelData = tunnelData;
this.socket = socket;
this.logger = pino(
{
level: 'info',
base: {},
timestamp: pino.stdTimeFunctions.isoTime,
},
pinoPretty({
colorize: true,
}),
);
this.HttpProxy = new HttpProxy(userSettings, this.logger);
this.WsProxy = new WsProxy(userSettings, this.logger);
this.HttpProxy = new HttpProxy(userSettings);
this.WsProxy = new WsProxy(userSettings);

process.env.NODE_TLS_REJECT_UNAUTHORIZED = this.userSettings.insecure.toString();
}
Expand All @@ -61,43 +44,53 @@ export class TunnelClient {
}

public onConnectionClose(code: string) {
this.logger.info('disconnected, code:', code);
logger.info('disconnected, code:', code);
}

public onConnectionError(error: string) {
this.logger.error('wsMain error', error);
logger.error('wsMain error', error);
}

private sendResponseToVkProxyServer(data: Buffer | string, callback?: (error?: Error) => void) {
this.socket.send(data, callback ? callback : undefined);
private sendResponseToVkTunnelBack(data: Buffer | string, callback?: (error?: Error) => void) {
this.socket.send(data, callback);
}

private parseProxyRequest(query: string): ProxiedNetworkPacket {
const seq = query.slice(this.SEQ_BEGIN, this.SEQ_END);
const messageType = query.slice(this.MSG_TYPE_BEGIN, this.MSG_TYPE_END) as MessageType;
const payload = query.slice(this.PAYLOAD_BEGIN);
const endpoint = payload.split(' ')[1];
private transformPayload(payload: Buffer[] | ArrayBuffer) {
return payload
.toString()
.replace(/Accept-Encoding:.*/, this.ACCEPT_ENCODING + ': ' + this.DISABLE_COMPRESS)
.replace(/Host: .*/, 'Host: ' + this.userSettings.host);
}

payload.split('\r');
private parseProxyRequest(data: RawData): ProxiedNetworkPacket {
const seq = data.slice(this.SEQ_BEGIN, this.SEQ_END).toString();
const rawPayload = data.slice(this.PAYLOAD_BEGIN);
const messageType = data
.slice(this.MSG_TYPE_BEGIN, this.MSG_TYPE_END)
.toString() as MessageTypeFromBack;

const isWebSocketBinary = messageType === MessageTypeFromBack.WEBSOCKET_BINARY;
const payload = isWebSocketBinary ? rawPayload : this.transformPayload(rawPayload);
const parsedRequest = httpParser.parseRequest(payload.toString());

const upgradeHeader = parsedRequest.headers['Upgrade'] || '';
const isWebsocketUpgrade = upgradeHeader.toLowerCase() === 'websocket';

const endpoint = payload.toString().split(' ')[1];

return { seq, endpoint, messageType, isWebsocketUpgrade, parsedRequest, payload };
}

public async onMessage(data: RawData) {
const query = data.toString();
const packetData = this.parseProxyRequest(query);
const packetData = this.parseProxyRequest(data);

packetData.parsedRequest.headers['Host'] = this.userSettings.host;
packetData.parsedRequest.headers[this.ACCEPT_ENCODING] = this.DISABLE_COMPRESS;

if (packetData.messageType === MessageType.HTTP && !packetData.isWebsocketUpgrade) {
this.HttpProxy.proxy(packetData, this.sendResponseToVkProxyServer.bind(this));
if (packetData.messageType === MessageTypeFromBack.HTTP && !packetData.isWebsocketUpgrade) {
this.HttpProxy.proxy(packetData, this.sendResponseToVkTunnelBack.bind(this));
} else {
this.WsProxy.proxy(packetData, this.sendResponseToVkProxyServer.bind(this));
this.WsProxy.proxy(packetData, this.sendResponseToVkTunnelBack.bind(this));
}
}
}
85 changes: 36 additions & 49 deletions src/entities/WsProxy.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
import WebSocket from 'ws';
import { Logger } from 'pino';
import { logger } from './Logger';
import {
MessageType,
MessageTypeToSend,
MessageTypeFromBack,
ProxiedNetworkPacket,
SendResponseToProxyServer,
UserProxyAppSettings,
} from '../types';

export class WsProxy {
private static ACCEPT_ENCODING = 'Accept-Encoding';
private static DISABLE_COMPRESS = 'gzip;q=0,deflate;q=0';
private connections: Map<string, WebSocket> = new Map();

private connections: Record<string, WebSocket> = {};

public constructor(
private readonly userSettings: UserProxyAppSettings,
private readonly logger: Logger,
) {}

private transformPayload(payload: string, proxyHost: string) {
return payload
.replace(/Accept-Encoding:.*/, WsProxy.ACCEPT_ENCODING + ': ' + WsProxy.DISABLE_COMPRESS)
.replace(/Host: .*/, 'Host: ' + proxyHost);
}
public constructor(private readonly userSettings: UserProxyAppSettings) {}

private filterWebSocketHeaders(headers: Record<string, string>) {
const allowedHeaders = [
Expand All @@ -31,40 +20,47 @@ export class WsProxy {
'Sec-WebSocket-Key',
'Sec-WebSocket-Version',
];

return Object.fromEntries(
Object.entries(headers).filter(([key]) => allowedHeaders.includes(key)),
);
}

private closeConnection(seq: string) {
this.connections[seq].close();
this.connections.get(seq)?.close();
}

private createConnection(
seq: string,
proxiedServerUrl: string,
endpoint: string,
headers: Record<string, string>,
sendResponseToVkProxyServer: SendResponseToProxyServer,
) {
const subprotocol = headers['Sec-Websocket-Protocol'];
const host = this.userSettings.wsOrigin ? this.userSettings.host : undefined;
const origin = this.userSettings.wsOrigin
? `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}`
: undefined;
const proxiedServerOrigin = `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}`;
const proxiedServerUrl = `${proxiedServerOrigin}${endpoint}`;

const websocket = new WebSocket(proxiedServerUrl, subprotocol, {
host,
origin,
host: this.userSettings.wsOrigin ? this.userSettings.host : undefined,
origin: this.userSettings.wsOrigin ? proxiedServerOrigin : undefined,
headers: this.filterWebSocketHeaders(headers),
});

websocket.on('error', (msg) => this.logger.error('Connection error for ' + seq, msg));
websocket.on('error', (msg) => logger.error('Connection error for ' + seq, msg));

websocket.on('open', () => {
this.connections[seq].on('message', (data) => {
this.logger.debug('incoming ws message from service', seq, data);
sendResponseToVkProxyServer(`${seq}${MessageType.WEBSOCKET}${data}`, () => {
this.logger.debug('send reply', seq, data);
websocket.on('message', (data, isBinary) => {
logger.debug('incoming ws message from service', seq, data, isBinary);

const dataBuf = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
const seqBuf = Buffer.from(seq, 'utf8');
const typeBuf = Buffer.from(MessageTypeToSend.WEBSOCKET, 'utf8');
const stringMessage = `${seq}${MessageTypeToSend.WEBSOCKET}${data}`;

const finalMessage = isBinary ? Buffer.concat([seqBuf, typeBuf, dataBuf]) : stringMessage;

sendResponseToVkProxyServer(finalMessage, () => {
logger.debug('send reply', seq, data, isBinary);
});
});
});
Expand All @@ -76,41 +72,32 @@ export class WsProxy {
];
const response = responseHeaders.join('\n') + '\n\n';

sendResponseToVkProxyServer(seq + MessageType.HTTP + response, () => {
this.logger.debug('send reply upgrade', seq, response.toString());
sendResponseToVkProxyServer(`${seq}${MessageTypeToSend.HTTP}${response}`, () => {
logger.debug('send reply upgrade', seq, response.toString());
});
});

this.connections[seq] = websocket;
this.connections.set(seq, websocket);
}

public async proxy(
request: ProxiedNetworkPacket,
sendResponseToVkProxyServer: SendResponseToProxyServer,
sendResponseToVkTunnelBack: SendResponseToProxyServer,
) {
const { messageType, payload, isWebsocketUpgrade, seq, endpoint, parsedRequest } = request;

if (messageType !== MessageType.HTTP) {
const filteredPayload = this.transformPayload(payload, this.userSettings.host);

if (messageType === MessageType.WEBSOCKET_CLOSE) {
return this.closeConnection(seq);
}
if (messageType === MessageTypeFromBack.WEBSOCKET_CLOSE) {
return this.closeConnection(seq);
}

this.connections[seq].send(filteredPayload, {}, () => {
this.logger.debug('WS REQUEST', 'seq: ' + seq, messageType, endpoint, filteredPayload);
if (messageType !== MessageTypeFromBack.HTTP) {
this.connections.get(seq)?.send(payload, {}, () => {
logger.debug('WS REQUEST', 'seq: ' + seq, messageType, endpoint, payload);
});
}

if (isWebsocketUpgrade) {
const proxiedServerUrl = `${this.userSettings.wsProtocol}://${this.userSettings.host}:${this.userSettings.port}${endpoint}`;

this.createConnection(
seq,
proxiedServerUrl,
parsedRequest.headers,
sendResponseToVkProxyServer,
);
this.createConnection(seq, endpoint, parsedRequest.headers, sendResponseToVkTunnelBack);
}
}
}
Loading

0 comments on commit dd2c320

Please sign in to comment.