diff --git a/src/StreamRepository.ts b/src/StreamRepository.ts index f0ffe91..15184a1 100644 --- a/src/StreamRepository.ts +++ b/src/StreamRepository.ts @@ -1,9 +1,9 @@ import { Logger } from '@streamr/utils' -import { Connection, createConnection, RowDataPacket } from 'mysql2/promise' +import { Pool, RowDataPacket, createPool } from 'mysql2/promise' import { Inject, Service } from 'typedi' -import { Config, CONFIG_TOKEN } from './Config' -import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities' +import { CONFIG_TOKEN, Config } from './Config' import { StreamrClientFacade } from './StreamrClientFacade' +import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities' import { collect } from './utils' interface StreamRow extends RowDataPacket { @@ -27,15 +27,15 @@ const logger = new Logger(module) @Service() export class StreamRepository { - private readonly connection: Promise - private readonly client: StreamrClientFacade + private readonly connectionPool: Pool + private readonly client: StreamrClientFacade constructor( @Inject() client: StreamrClientFacade, @Inject(CONFIG_TOKEN) config: Config ) { this.client = client - this.connection = createConnection({ + this.connectionPool = createPool({ host: config.database.host, database: config.database.name, user: config.database.user, @@ -85,11 +85,7 @@ export class StreamRepository { // the result set or a token which references to a stateful cache). const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0 params.push(limit, offset) - const connection = await this.connection - const [ rows ] = await connection.query( - sql, - params - ) + const rows = await this.queryOrExecute(sql, params) return { items: rows, cursor: (rows.length === pageSize) ? String(offset + rows.length) : null @@ -136,19 +132,17 @@ export class StreamRepository { streamCount: number messagesPerSecond: number } - const connection = await this.connection - const [ rows ] = await connection.query( + const rows = await this.queryOrExecute( 'SELECT count(*) as streamCount, sum(messagesPerSecond) as messagesPerSecond FROM streams' ) return rows[0] } async getAllStreams(): Promise<{ id: string, crawlTimestamp: number }[]> { - const connection = await this.connection - const [ rows ] = await connection.query( + const rows = await this.queryOrExecute( 'SELECT id, crawlTimestamp FROM streams' ) - return rows.map((r) => { + return rows.map((r: StreamRow) => { return { id: r.id, crawlTimestamp: Date.parse(r.crawlTimestamp) @@ -157,16 +151,14 @@ export class StreamRepository { } async deleteStream(id: string): Promise { - const connection = await this.connection - await connection.query( + await this.queryOrExecute( 'DELETE FROM streams WHERE id = ?', [id] ) } async replaceStream(stream: Stream): Promise { - const connection = await this.connection - await connection.query( + await this.queryOrExecute( `REPLACE INTO streams ( id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount, crawlTimestamp ) VALUES ( @@ -176,7 +168,20 @@ export class StreamRepository { ) } + private async queryOrExecute(sql: string, params?: any): Promise { + const connection = await this.connectionPool.getConnection() + try { + const [ rows ] = await connection.query( + sql, + params + ) + return rows + } finally { + connection.release() + } + } + async destroy(): Promise { - (await this.connection).destroy() + this.connectionPool.end() } }