Skip to content

Commit

Permalink
Merge pull request #6 from streamr-dev/separate-stream-and-summary
Browse files Browse the repository at this point in the history
Separate `Stream` and `Summary` entity definitions, repositories and resolvers.
  • Loading branch information
teogeb authored Feb 26, 2024
2 parents fa1a2e0 + 35601f7 commit a32dbfa
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 94 deletions.
3 changes: 2 additions & 1 deletion src/api/APIServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { buildSchema } from 'type-graphql'
import { Container, Inject, Service } from 'typedi'
import { Config, CONFIG_TOKEN } from '../Config'
import { StreamResolver } from './StreamResolver'
import { SummaryResolver } from './SummaryResolver'

const logger = new Logger(module)

Expand All @@ -28,7 +29,7 @@ export class APIServer {

async start(): Promise<void> {
const schema = await buildSchema({
resolvers: [StreamResolver],
resolvers: [StreamResolver, SummaryResolver],
container: Container,
validate: false
})
Expand Down
12 changes: 4 additions & 8 deletions src/api/StreamResolver.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Arg, Int, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { OrderBy, OrderDirection, Streams, Summary } from '../entities'
import { StreamRepository } from '../StreamRepository'
import { OrderDirection } from '../entities/OrderDirection'
import { StreamOrderBy, Streams } from '../entities/Stream'
import { StreamRepository } from '../repository/StreamRepository'

@Resolver()
@Service()
Expand All @@ -20,16 +21,11 @@ export class StreamResolver {
@Arg("ids", () => [String], { nullable: true }) ids?: string[],
@Arg("searchTerm", { nullable: true }) searchTerm?: string,
@Arg("owner", { nullable: true }) owner?: string,
@Arg("orderBy", () => OrderBy, { nullable: true }) orderBy?: OrderBy,
@Arg("orderBy", () => StreamOrderBy, { nullable: true }) orderBy?: StreamOrderBy,
@Arg("orderDirection", () => OrderDirection, { nullable: true }) orderDirection?: OrderDirection,
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
): Promise<Streams> {
return this.repository.getStreams(ids, searchTerm, owner, orderBy, orderDirection, pageSize, cursor)
}

@Query(() => Summary)
async summary(): Promise<Summary> {
return this.repository.getSummary()
}
}
22 changes: 22 additions & 0 deletions src/api/SummaryResolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { Summary } from '../entities/Summary'
import { SummaryRepository } from '../repository/SummaryRepository'

@Resolver()
@Service()
export class SummaryResolver {

private repository: SummaryRepository

constructor(
@Inject() repository: SummaryRepository
) {
this.repository = repository
}

@Query(() => Summary)
async summary(): Promise<Summary> {
return this.repository.getSummary()
}
}
16 changes: 8 additions & 8 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pLimit from 'p-limit'
import { DhtAddress, Stream, StreamCreationEvent, StreamMetadata, StreamPermission } from 'streamr-client'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from '../Config'
import { StreamRepository } from '../StreamRepository'
import { StreamRepository } from '../repository/StreamRepository'
import { StreamrClientFacade } from '../StreamrClientFacade'
import { collect, retry } from '../utils'
import { NetworkNodeFacade } from './NetworkNodeFacade'
Expand Down Expand Up @@ -99,18 +99,18 @@ export const crawlTopology = async (
@Service()
export class Crawler {

private readonly database: StreamRepository
private readonly streamRepository: StreamRepository
private readonly client: StreamrClientFacade
private readonly config: Config
private subscribeGate?: SubscribeGate
private onStreamCreated?: (payload: StreamCreationEvent) => Promise<void>

constructor(
@Inject() database: StreamRepository,
@Inject() streamRepository: StreamRepository,
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.database = database
this.streamRepository = streamRepository
this.client = client
this.config = config
}
Expand Down Expand Up @@ -156,7 +156,7 @@ export class Crawler {
// the graph-node dependency may not be available immediately after the service has
// been started
const contractStreams = await retry(() => collect(this.client.getAllStreams()), 'Query streams')
const databaseStreams = await this.database.getAllStreams()
const databaseStreams = await this.streamRepository.getAllStreams()
logger.info(`Streams: contract=${contractStreams.length}, database=${databaseStreams.length}`)
const sortedContractStreams = sortBy(contractStreams, getCrawlOrderComparator(databaseStreams))

Expand Down Expand Up @@ -198,7 +198,7 @@ export class Crawler {
const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH)
const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE)
logger.info(`Replace ${id}`)
await this.database.replaceStream({
await this.streamRepository.replaceStream({
id,
description: metadata.description ?? null,
peerCount: peerIds.size,
Expand All @@ -220,7 +220,7 @@ export class Crawler {
const removedStreamsIds = difference(databaseStreamIds, contractStreamIds)
for (const streamId of removedStreamsIds) {
logger.info(`Delete ${streamId}`)
await this.database.deleteStream(streamId)
await this.streamRepository.deleteStream(streamId)
}
}

Expand All @@ -234,7 +234,7 @@ export class Crawler {
// - assume no peers and no traffic
// - assume that no explicit permissions have been granted yet (the creator
// is the only publisher and subscriber
await this.database.replaceStream({
await this.streamRepository.replaceStream({
id: payload.streamId,
description: payload.metadata.description ?? null,
peerCount: 0,
Expand Down
10 changes: 10 additions & 0 deletions src/entities/OrderDirection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { registerEnumType } from 'type-graphql'

export enum OrderDirection {
ASC = 'ASC',
DESC = 'DESC'
}

registerEnumType(OrderDirection, {
name: 'OrderDirection'
})
26 changes: 5 additions & 21 deletions src/entities.ts → src/entities/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable indent */
import { Field, Float, Int, ObjectType, registerEnumType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
export class Stream {
@Field()
Expand All @@ -17,7 +17,7 @@ export class Stream {
subscriberCount!: number | null
}

export enum OrderBy {
export enum StreamOrderBy {
ID = 'ID',
DESCRIPTION = 'DESCRIPTION',
PEER_COUNT = 'PEER_COUNT',
Expand All @@ -26,31 +26,15 @@ export enum OrderBy {
PUBLISHER_COUNT = 'PUBLISHER_COUNT'
}

registerEnumType(OrderBy, {
name: 'OrderBy'
})

export enum OrderDirection {
ASC = 'ASC',
DESC = 'DESC'
}

registerEnumType(OrderDirection, {
name: 'OrderDirection'
registerEnumType(StreamOrderBy, {
name: 'StreamOrderBy'
})

/* eslint-disable indent */
@ObjectType()
export class Streams {
@Field(() => [Stream])
items!: Stream[]
@Field(() => String, { nullable: true })
cursor!: string | null
}

@ObjectType()
export class Summary {
@Field(() => Int)
streamCount!: number
@Field(() => Float)
messagesPerSecond!: number
}
10 changes: 10 additions & 0 deletions src/entities/Summary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Field, Float, Int, ObjectType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
export class Summary {
@Field(() => Int)
streamCount!: number
@Field(() => Float)
messagesPerSecond!: number
}
38 changes: 38 additions & 0 deletions src/repository/ConnectionPool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from '../Config'

@Service()
export class ConnectionPool {

private readonly delegatee: Pool

constructor(
@Inject(CONFIG_TOKEN) config: Config
) {
this.delegatee = createPool({
host: config.database.host,
database: config.database.name,
user: config.database.user,
password: config.database.password
})
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
const connection = await this.delegatee.getConnection()
try {
const [ rows ] = await connection.query<T>(
sql,
params
)
return rows
} finally {
connection.release()
}
}

async destroy(): Promise<void> {
this.delegatee.end()
}
}
Loading

0 comments on commit a32dbfa

Please sign in to comment.