Skip to content

Commit

Permalink
Store topology nodes to DB (#7)
Browse files Browse the repository at this point in the history
Store all nodes of a topology to the database. 

New queries
- simple GraphQL interface to query all nodes, or a subset of nodes filtered by ids. 
- new `nodeCount` field to the `summary` endpoint.

## Refactoring

`ConnectionPool#queryPaginated` helper method and `createSqlQuery` utility method.

## Future improvements

Should we store new nodes and neighbors which we find when we crawl a single newly created stream.
  • Loading branch information
teogeb authored Feb 26, 2024
1 parent a32dbfa commit e86e068
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 44 deletions.
4 changes: 4 additions & 0 deletions initialize-database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ CREATE TABLE IF NOT EXISTS streams (
INDEX streams_subscriberCount (subscriberCount)
);

CREATE TABLE IF NOT EXISTS nodes (
id CHAR(40) NOT NULL PRIMARY KEY,
ipAddress VARCHAR(15)
);
4 changes: 4 additions & 0 deletions src/StreamrClientFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ export class StreamrClientFacade {
return this.client.getConfig().network.controlLayer.entryPoints!.map(peerDescriptorTranslator)
}

async getNodeId(): Promise<DhtAddress> {
return await this.client.getNodeId()
}

async destroy(): Promise<void> {
await this.client.destroy()
}
Expand Down
3 changes: 2 additions & 1 deletion src/api/APIServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Container, Inject, Service } from 'typedi'
import { Config, CONFIG_TOKEN } from '../Config'
import { StreamResolver } from './StreamResolver'
import { SummaryResolver } from './SummaryResolver'
import { NodeResolver } from './NodeResolver'

const logger = new Logger(module)

Expand All @@ -29,7 +30,7 @@ export class APIServer {

async start(): Promise<void> {
const schema = await buildSchema({
resolvers: [StreamResolver, SummaryResolver],
resolvers: [StreamResolver, NodeResolver, SummaryResolver],
container: Container,
validate: false
})
Expand Down
26 changes: 26 additions & 0 deletions src/api/NodeResolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Arg, Int, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { Nodes } from '../entities/Node'
import { NodeRepository } from '../repository/NodeRepository'

@Resolver()
@Service()
export class NodeResolver {

private repository: NodeRepository

constructor(
@Inject() repository: NodeRepository
) {
this.repository = repository
}

@Query(() => Nodes)
async nodes(
@Arg("ids", () => [String], { nullable: true }) ids?: string[],
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
): Promise<Nodes> {
return this.repository.getNodes(ids, pageSize, cursor)
}
}
10 changes: 10 additions & 0 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { NetworkNodeFacade } from './NetworkNodeFacade'
import { MAX_SUBSCRIPTION_COUNT, SubscribeGate } from './SubscribeGate'
import { Topology } from './Topology'
import { getMessageRate } from './messageRate'
import { NodeRepository } from '../repository/NodeRepository'

const logger = new Logger(module)

Expand Down Expand Up @@ -100,17 +101,20 @@ export const crawlTopology = async (
export class Crawler {

private readonly streamRepository: StreamRepository
private readonly nodeRepository: NodeRepository
private readonly client: StreamrClientFacade
private readonly config: Config
private subscribeGate?: SubscribeGate
private onStreamCreated?: (payload: StreamCreationEvent) => Promise<void>

constructor(
@Inject() streamRepository: StreamRepository,
@Inject() nodeRepository: NodeRepository,
@Inject() client: StreamrClientFacade,
@Inject(CONFIG_TOKEN) config: Config
) {
this.streamRepository = streamRepository
this.nodeRepository = nodeRepository
this.client = client
this.config = config
}
Expand All @@ -133,6 +137,7 @@ export class Crawler {
(nodeInfo: NodeInfo) => nodeInfo.controlLayer!.neighbors,
`full-${Date.now()}`
)
await this.nodeRepository.replaceNetworkTopology(topology)
await this.analyzeContractStreams(topology, this.subscribeGate)
} catch (e) {
logger.error('Error', { err: e })
Expand Down Expand Up @@ -256,12 +261,17 @@ export class Crawler {
)
return (streamPartitions.map((sp) => sp.deliveryLayerNeighbors)).flat()
}, `stream-${payload.streamId}-${Date.now()}`)
// TODO could add new nodes and neighbors to NodeRepository?
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
} catch (e: any) {
logger.error(`Failed to handle new stream ${payload.streamId}`, e)
}
}

getNodeId(): Promise<DhtAddress> {
return this.client.getNodeId()
}

stop(): void {
logger.info('Stop')
this.client.off('createStream', this.onStreamCreated!)
Expand Down
7 changes: 5 additions & 2 deletions src/crawler/Topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ import { StreamPartIDUtils } from '@streamr/protocol'
import { NodeInfo } from '@streamr/trackerless-network'
import { Multimap } from '@streamr/utils'
import { DhtAddress, StreamPartID } from 'streamr-client'
import { numberToIpv4 } from '../utils'

interface Node {
export interface Node {
id: DhtAddress
streamPartNeighbors: Multimap<StreamPartID, DhtAddress>
ipAddress?: string
}

export class Topology {
Expand All @@ -26,7 +28,8 @@ export class Topology {
const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor)
this.nodes.set(nodeId, {
id: nodeId,
streamPartNeighbors
streamPartNeighbors,
ipAddress: (info.peerDescriptor.ipAddress !== undefined) ? numberToIpv4(info.peerDescriptor.ipAddress) : undefined
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/entities/Node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Field, ObjectType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
export class Node {
@Field()
id!: string
@Field(() => String, { nullable: true })
ipAddress!: string | null
}

/* eslint-disable indent */
@ObjectType()
export class Nodes {
@Field(() => [Node])
items!: Node[]
@Field(() => String, { nullable: true })
cursor!: string | null
}
2 changes: 2 additions & 0 deletions src/entities/Summary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ export class Summary {
streamCount!: number
@Field(() => Float)
messagesPerSecond!: number
@Field(() => Int)
nodeCount!: number
}
28 changes: 26 additions & 2 deletions src/repository/ConnectionPool.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Pool, RowDataPacket, createPool } from 'mysql2/promise'
import { Pool, RowDataPacket, createPool, PoolConnection } from 'mysql2/promise'
import { Inject, Service } from 'typedi'
import { CONFIG_TOKEN, Config } from '../Config'

const DEFAULT_PAGE_SIZE = 100

@Service()
export class ConnectionPool {

Expand All @@ -19,7 +21,7 @@ export class ConnectionPool {
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any): Promise<T> {
async queryOrExecute<T extends RowDataPacket[]>(sql: string, params?: any[]): Promise<T> {
const connection = await this.delegatee.getConnection()
try {
const [ rows ] = await connection.query<T>(
Expand All @@ -32,6 +34,28 @@ export class ConnectionPool {
}
}

async queryPaginated<T extends RowDataPacket[]>(
sql: string, params: any[], pageSize?: number, cursor?: string
): Promise<{ items: T, cursor: string | null }> {
const limit = pageSize ?? DEFAULT_PAGE_SIZE
// The cursor is currently just an offset to the result set. We can later implement
// enhanced cursor functionality if needed (e.g. cursor can be the last item of
// the result set or a token which references to a stateful cache).
const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0
const rows = await this.queryOrExecute<T>(
`${sql} LIMIT ? OFFSET ?`,
[...params, limit, offset]
)
return {
items: rows,
cursor: (rows.length === pageSize) ? String(offset + rows.length) : null
}
}

async getConnection(): Promise<PoolConnection> {
return this.delegatee.getConnection()
}

async destroy(): Promise<void> {
this.delegatee.end()
}
Expand Down
63 changes: 63 additions & 0 deletions src/repository/NodeRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { Logger } from '@streamr/utils'
import { RowDataPacket } from 'mysql2'
import { Inject, Service } from 'typedi'
import { Topology } from '../crawler/Topology'
import { Nodes } from '../entities/Node'
import { createSqlQuery } from '../utils'
import { ConnectionPool } from './ConnectionPool'

interface NodeRow extends RowDataPacket {
id: string
ipAddress: string | null
}

const logger = new Logger(module)

@Service()
export class NodeRepository {

private readonly connectionPool: ConnectionPool

constructor(
@Inject() connectionPool: ConnectionPool
) {
this.connectionPool = connectionPool
}

async getNodes(
ids?: string[],
pageSize?: number,
cursor?: string
): Promise<Nodes> {
logger.info('Query: getNodes', { ids, pageSize, cursor })
const whereClauses = []
const params = []
if (ids !== undefined) {
whereClauses.push('id in (?)')
params.push(ids)
}
const sql = createSqlQuery(
`SELECT id, ipAddress FROM nodes`,
whereClauses
)
return this.connectionPool.queryPaginated<NodeRow[]>(sql, params)
}

async replaceNetworkTopology(topology: Topology): Promise<void> {
const nodes = topology.getNodes().map((node) => {
return [node.id, node.ipAddress]
})
const connection = await this.connectionPool.getConnection()
try {
await connection.beginTransaction()
await connection.query('DELETE FROM nodes')
await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes])
await connection.commit()
} catch (e) {
connection.rollback()
throw e
} finally {
connection.release()
}
}
}
28 changes: 7 additions & 21 deletions src/repository/StreamRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Inject, Service } from 'typedi'
import { StreamrClientFacade } from '../StreamrClientFacade'
import { OrderDirection } from '../entities/OrderDirection'
import { StreamOrderBy, Stream, Streams } from '../entities/Stream'
import { collect } from '../utils'
import { collect, createSqlQuery } from '../utils'
import { ConnectionPool } from './ConnectionPool'

interface StreamRow extends RowDataPacket {
Expand All @@ -21,8 +21,6 @@ const EMPTY_SEARCH_RESULT = {
cursor: null
}

const DEFAULT_PAGE_SIZE = 100

const logger = new Logger(module)

@Service()
Expand Down Expand Up @@ -68,24 +66,12 @@ export class StreamRepository {
const streamIds = streams.map((s) => s.id)
params.push(streamIds)
}
const orderByExpression = StreamRepository.formOrderByExpression(orderBy ?? StreamOrderBy.ID, orderDirection ?? OrderDirection.ASC)
const sql = `
SELECT id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount
FROM streams
${(whereClauses.length > 0) ? 'WHERE ' + whereClauses.join(' AND ') : ''}
ORDER BY ${orderByExpression}
LIMIT ? OFFSET ?`
const limit = pageSize ?? DEFAULT_PAGE_SIZE
// The cursor is currently just an offset to the result set. We can later implement
// enhanced cursor functionality if needed (e.g. cursor can be the last item of
// the result set or a token which references to a stateful cache).
const offset = (cursor !== undefined) ? parseInt(cursor, 10) : 0
params.push(limit, offset)
const rows = await this.connectionPool.queryOrExecute<StreamRow[]>(sql, params)
return {
items: rows,
cursor: (rows.length === pageSize) ? String(offset + rows.length) : null
}
const sql = createSqlQuery(
'SELECT id, description, peerCount, messagesPerSecond, publisherCount, subscriberCount FROM streams',
whereClauses,
StreamRepository.formOrderByExpression(orderBy ?? StreamOrderBy.ID, orderDirection ?? OrderDirection.ASC)
)
return this.connectionPool.queryPaginated<StreamRow[]>(sql, params, pageSize, cursor)
}

private static formOrderByExpression(orderBy: StreamOrderBy, orderDirection: OrderDirection) {
Expand Down
23 changes: 17 additions & 6 deletions src/repository/SummaryRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ import { Inject, Service } from 'typedi'
import { Summary } from '../entities/Summary'
import { ConnectionPool } from './ConnectionPool'

interface StreamSummaryRow extends RowDataPacket {
streamCount: number
messagesPerSecond: number
}

interface NodeSummaryRow extends RowDataPacket {
nodeCount: number
}

@Service()
export class SummaryRepository {

Expand All @@ -15,13 +24,15 @@ export class SummaryRepository {
}

async getSummary(): Promise<Summary> {
interface SummaryRow extends RowDataPacket {
streamCount: number
messagesPerSecond: number
}
const rows = await this.connectionPool.queryOrExecute<SummaryRow[]>(
const streamSummaryRows = await this.connectionPool.queryOrExecute<StreamSummaryRow[]>(
'SELECT count(*) as streamCount, sum(messagesPerSecond) as messagesPerSecond FROM streams'
)
return rows[0]
const nodeSummaryRows = await this.connectionPool.queryOrExecute<NodeSummaryRow[]>(
'SELECT count(*) as nodeCount FROM nodes'
)
return {
...streamSummaryRows[0],
...nodeSummaryRows[0]
}
}
}
Loading

0 comments on commit e86e068

Please sign in to comment.