Skip to content

Commit

Permalink
store topology neighbors to DB
Browse files Browse the repository at this point in the history
  • Loading branch information
teogeb committed Feb 26, 2024
1 parent e86e068 commit 492375c
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 30 deletions.
10 changes: 10 additions & 0 deletions initialize-database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ CREATE TABLE IF NOT EXISTS nodes (
id CHAR(40) NOT NULL PRIMARY KEY,
ipAddress VARCHAR(15)
);

CREATE TABLE IF NOT EXISTS neighbors (
streamPartId VARCHAR(500) NOT NULL,
nodeId1 CHAR(40) NOT NULL,
nodeId2 CHAR(40) NOT NULL,
PRIMARY KEY (streamPartId, nodeId1, nodeId2),
FOREIGN KEY (nodeId1) REFERENCES nodes(id),
FOREIGN KEY (nodeId2) REFERENCES nodes(id),
INDEX neighbors_streamPartId (streamPartId)
);
28 changes: 25 additions & 3 deletions src/api/NodeResolver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Arg, Int, Query, Resolver } from 'type-graphql'
import { StreamPartIDUtils } from '@streamr/protocol'
import { FieldNode, GraphQLResolveInfo } from 'graphql'
import { Arg, Info, Int, Query, Resolver } from 'type-graphql'
import { Inject, Service } from 'typedi'
import { Nodes } from '../entities/Node'
import { NeighborInput, Neighbors, Nodes } from '../entities/Node'
import { NodeRepository } from '../repository/NodeRepository'

@Resolver()
Expand All @@ -17,10 +19,30 @@ export class NodeResolver {

@Query(() => Nodes)
async nodes(
@Info() info: GraphQLResolveInfo,
@Arg("ids", () => [String], { nullable: true }) ids?: string[],
@Arg("streamPart", { nullable: true }) streamPart?: string,
@Arg("neighbor", { nullable: true }) neighbor?: NeighborInput,
@Arg("pageSize", () => Int, { nullable: true }) pageSize?: number,
@Arg("cursor", { nullable: true }) cursor?: string,
): Promise<Nodes> {
return this.repository.getNodes(ids, pageSize, cursor)
const nodesField = info.fieldNodes[0]
const itemsField = nodesField.selectionSet!.selections[0] as FieldNode
const requestedFields = itemsField.selectionSet!.selections
return this.repository.getNodes(
new Set(requestedFields.map((f: any) => f.name.value)),
ids,
(streamPart !== undefined) ? StreamPartIDUtils.parse(streamPart) : undefined,
neighbor,
pageSize,
cursor
)
}

@Query(() => Neighbors)
async neighbors(
@Arg("streamPart", { nullable: false }) streamPart: string,
): Promise<Neighbors> {
return this.repository.getNeighbors(StreamPartIDUtils.parse(streamPart))
}
}
41 changes: 40 additions & 1 deletion src/entities/Node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Field, ObjectType } from 'type-graphql'
import { Field, InputType, ObjectType } from 'type-graphql'

/* eslint-disable indent */
@ObjectType()
Expand All @@ -7,6 +7,18 @@ export class Node {
id!: string
@Field(() => String, { nullable: true })
ipAddress!: string | null
@Field(() => [StreamPartNeigbors])
neighbors!: StreamPartNeigbors[]
}

/* eslint-disable indent */
@ObjectType()
export class StreamPartNeigbors {
@Field(() => String, { nullable: false })
streamPartId!: string | null
// TODO could return Node entities?
@Field(() => [String])
nodeIds!: string[]
}

/* eslint-disable indent */
Expand All @@ -17,3 +29,30 @@ export class Nodes {
@Field(() => String, { nullable: true })
cursor!: string | null
}

/* eslint-disable indent */
@InputType()
export class NeighborInput {
@Field(() => String, { nullable: false })
node!: string | null
@Field(() => String, { nullable: false })
streamPart!: string | null
}

/* eslint-disable indent */
@ObjectType()
export class Neighbor {
@Field()
nodeId1!: string
@Field()
nodeId2!: string
}

/* eslint-disable indent */
@ObjectType()
export class Neighbors {
@Field(() => [Neighbor])
items!: Neighbor[]
@Field(() => String, { nullable: true })
cursor!: string | null
}
91 changes: 88 additions & 3 deletions src/repository/NodeRepository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { DhtAddress } from '@streamr/dht'
import { Logger } from '@streamr/utils'
import { RowDataPacket } from 'mysql2'
import { StreamPartID } from 'streamr-client'
import { Inject, Service } from 'typedi'
import { Topology } from '../crawler/Topology'
import { Nodes } from '../entities/Node'
import { NeighborInput, Neighbors, Nodes, StreamPartNeigbors } from '../entities/Node'
import { createSqlQuery } from '../utils'
import { ConnectionPool } from './ConnectionPool'

Expand All @@ -11,6 +13,12 @@ interface NodeRow extends RowDataPacket {
ipAddress: string | null
}

interface NeighborRow extends RowDataPacket {
streamPartId: string
nodeId1: string
nodeId2: string
}

const logger = new Logger(module)

@Service()
Expand All @@ -25,33 +33,110 @@ export class NodeRepository {
}

async getNodes(
requestedFields: Set<string>,
ids?: string[],
streamPartId?: StreamPartID,
neighbor?: NeighborInput,
pageSize?: number,
cursor?: string
): Promise<Nodes> {
logger.info('Query: getNodes', { ids, pageSize, cursor })
logger.info('Query: getNodes', { ids, streamPartId, neighbor, pageSize, cursor })
const whereClauses = []
const params = []
if (ids !== undefined) {
whereClauses.push('id in (?)')
params.push(ids)
}
if (streamPartId !== undefined) {
whereClauses.push(`
id in (SELECT nodeId1 FROM neighbors WHERE streamPartId = ?) OR
id in (SELECT nodeId2 FROM neighbors WHERE streamPartId = ?)
`)
params.push(streamPartId, streamPartId)
}
if (neighbor !== undefined) {
whereClauses.push(`
id in (SELECT nodeId1 FROM neighbors WHERE nodeId2 = ? AND streamPartId = ?) OR
id in (SELECT nodeId2 FROM neighbors WHERE nodeId1 = ? AND streamPartId = ?)
`)
params.push(neighbor.node, neighbor.streamPart, neighbor.node, neighbor.streamPart)
}
const sql = createSqlQuery(
`SELECT id, ipAddress FROM nodes`,
whereClauses
)
return this.connectionPool.queryPaginated<NodeRow[]>(sql, params)
const rows = await this.connectionPool.queryPaginated<NodeRow[]>(sql, params)
const items: Nodes['items'] = []
const includeNeighbors = requestedFields.has('neighbors')
for (const row of rows.items) {
items.push({
...row,
neighbors: includeNeighbors ? await this.getStreamPartNeighbors(row.id as DhtAddress) : [],
})
}
return {
items,
cursor: rows.cursor
}
}

private async getStreamPartNeighbors(id: DhtAddress): Promise<StreamPartNeigbors[]> {
const rows = await this.connectionPool.queryOrExecute<NeighborRow[]>(
`SELECT streamPartId, nodeId1, nodeId2 FROM neighbors WHERE nodeId1 = ? OR nodeId2 = ?`,
[id, id]
)
const result: StreamPartNeigbors[] = []
for (const row of rows) {
const otherNode = (row.nodeId1 === id) ? row.nodeId2 : row.nodeId1
const item = result.find((i) => i.streamPartId === row.streamPartId)
if (item !== undefined) {
item.nodeIds.push(otherNode)
} else {
result.push({
streamPartId: row.streamPartId,
nodeIds: [otherNode]
})
}
}
return result
}

async getNeighbors(
streamPartId: StreamPartID,
): Promise<Neighbors> {
logger.info('Query: getNeighbors', { streamPartId })
return this.connectionPool.queryPaginated<NeighborRow[]>(
`SELECT nodeId1, nodeId2 FROM neighbors WHERE streamPartId = ?`,
[streamPartId]
)
}

async replaceNetworkTopology(topology: Topology): Promise<void> {
const nodes = topology.getNodes().map((node) => {
return [node.id, node.ipAddress]
})
const neighbors: [StreamPartID, DhtAddress, DhtAddress][] = []
for (const node of topology.getNodes()) {
for (const streamPartId of node.streamPartNeighbors.keys()) {
const streamPartNeighbors = node.streamPartNeighbors.get(streamPartId)!
for (const neighbor of streamPartNeighbors) {
// If node A and B are neighbors, we assume that there are two associations in the topology:
// A->B and B-A. We don't need to store both associations to the DB. The following comparison
// filters out the duplication. Note that if there is only one side of the association
// in the topology, that association is maybe not stored at all.
if (node.id < neighbor) {
neighbors.push([streamPartId, node.id, neighbor])
}
}
}
}
const connection = await this.connectionPool.getConnection()
try {
await connection.beginTransaction()
await connection.query('DELETE FROM neighbors')
await connection.query('DELETE FROM nodes')
await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes])
await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2) VALUES ?', [neighbors])
await connection.commit()
} catch (e) {
connection.rollback()
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export const queryAPI = async (query: string, port: number): Promise<any> => {
export const createSqlQuery = (select: string, whereClauses: string[], orderByExpression?: string): string => {
let sql = select
if (whereClauses.length > 0) {
sql += ` WHERE ${whereClauses.join(' AND ')}`
sql += ` WHERE ${whereClauses.map((c) => `(${c})`).join(' AND ')}`
}
if (orderByExpression !== undefined) {
sql += ` ORDER BY ${orderByExpression}`
Expand Down
Loading

0 comments on commit 492375c

Please sign in to comment.