Skip to content

Commit

Permalink
use connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
teogeb committed Feb 19, 2024
1 parent 554e849 commit 134cdd0
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions src/StreamRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Logger } from '@streamr/utils'
import { Connection, createConnection, RowDataPacket } from 'mysql2/promise'
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { Config, CONFIG_TOKEN } from './Config'
import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities'
import { CONFIG_TOKEN, Config } from './Config'
import { StreamrClientFacade } from './StreamrClientFacade'
import { OrderBy, OrderDirection, Stream, Streams, Summary } from './entities'
import { collect } from './utils'

interface StreamRow extends RowDataPacket {
Expand All @@ -27,15 +27,15 @@ const logger = new Logger(module)
@Service()
export class StreamRepository {

private readonly connection: Promise<Connection>
private readonly client: StreamrClientFacade
private readonly connectionPool: Pool
private readonly client: StreamrClientFacade

constructor(
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.client = client
this.connection = createConnection({
this.connectionPool = createPool({
host: config.database.host,
database: config.database.name,
user: config.database.user,
Expand Down Expand Up @@ -85,11 +85,7 @@ export class StreamRepository {
// the result set or a token which references to a stateful cache).
const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0
params.push(limit, offset)
const connection = await this.connection
const [ rows ] = await connection.query<StreamRow[]>(
sql,
params
)
const rows = await this.queryOrExecute<StreamRow[]>(sql, params)
return {
items: rows,
cursor: (rows.length === pageSize) ? String(offset + rows.length) : null
Expand Down Expand Up @@ -136,19 +132,17 @@ export class StreamRepository {
streamCount: number
messagesPerSecond: number
}
const connection = await this.connection
const [ rows ] = await connection.query<SummaryRow[]>(
const rows = await this.queryOrExecute<SummaryRow[]>(
'SELECT count(*) as streamCount, sum(messagesPerSecond) as messagesPerSecond FROM streams'
)
return rows[0]
}

async getAllStreams(): Promise<{ id: string, crawlTimestamp: number }[]> {
const connection = await this.connection
const [ rows ] = await connection.query<StreamRow[]>(
const rows = await this.queryOrExecute<StreamRow[]>(
'SELECT id, crawlTimestamp FROM streams'
)
return rows.map((r) => {
return rows.map((r: StreamRow) => {
return {
id: r.id,
crawlTimestamp: Date.parse(r.crawlTimestamp)
Expand All @@ -157,16 +151,14 @@ export class StreamRepository {
}

async deleteStream(id: string): Promise<void> {
const connection = await this.connection
await connection.query(
await this.queryOrExecute(
'DELETE FROM streams WHERE id = ?',
[id]
)
}

async replaceStream(stream: Stream): Promise<void> {
const connection = await this.connection
await connection.query(
await this.queryOrExecute(
`REPLACE INTO streams (
id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount, crawlTimestamp
) VALUES (
Expand All @@ -176,7 +168,20 @@ export class StreamRepository {
)
}

private async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
const connection = await this.connectionPool.getConnection()
try {
const [ rows ] = await connection.query<T>(
sql,
params
)
return rows
} finally {
connection.release()
}
}

async destroy(): Promise<void> {
(await this.connection).destroy()
this.connectionPool.end()
}
}

0 comments on commit 134cdd0

Please sign in to comment.