From 492375ccf2bdfe9a5936fa1929daca561d202fd8 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 26 Feb 2024 15:05:14 +0200 Subject: [PATCH] store topology neighbors to DB --- initialize-database.sql | 10 +++ src/api/NodeResolver.ts | 28 +++++++- src/entities/Node.ts | 41 ++++++++++- src/repository/NodeRepository.ts | 91 ++++++++++++++++++++++- src/utils.ts | 2 +- test/APIServer.test.ts | 120 +++++++++++++++++++++++++------ test/end-to-end.test.ts | 21 +++++- 7 files changed, 283 insertions(+), 30 deletions(-) diff --git a/initialize-database.sql b/initialize-database.sql index 649f37e..38a668c 100755 --- a/initialize-database.sql +++ b/initialize-database.sql @@ -17,3 +17,13 @@ CREATE TABLE IF NOT EXISTS nodes ( id CHAR(40) NOT NULL PRIMARY KEY, ipAddress VARCHAR(15) ); + +CREATE TABLE IF NOT EXISTS neighbors ( + streamPartId VARCHAR(500) NOT NULL, + nodeId1 CHAR(40) NOT NULL, + nodeId2 CHAR(40) NOT NULL, + PRIMARY KEY (streamPartId, nodeId1, nodeId2), + FOREIGN KEY (nodeId1) REFERENCES nodes(id), + FOREIGN KEY (nodeId2) REFERENCES nodes(id), + INDEX neighbors_streamPartId (streamPartId) +); diff --git a/src/api/NodeResolver.ts b/src/api/NodeResolver.ts index 1a28a8f..ec1c2e7 100644 --- a/src/api/NodeResolver.ts +++ b/src/api/NodeResolver.ts @@ -1,6 +1,8 @@ -import { Arg, Int, Query, Resolver } from 'type-graphql' +import { StreamPartIDUtils } from '@streamr/protocol' +import { FieldNode, GraphQLResolveInfo } from 'graphql' +import { Arg, Info, Int, Query, Resolver } from 'type-graphql' import { Inject, Service } from 'typedi' -import { Nodes } from '../entities/Node' +import { NeighborInput, Neighbors, Nodes } from '../entities/Node' import { NodeRepository } from '../repository/NodeRepository' @Resolver() @@ -17,10 +19,30 @@ export class NodeResolver { @Query(() => Nodes) async nodes( + @Info() info: GraphQLResolveInfo, @Arg("ids", () => [String], { nullable: true }) ids?: string[], + @Arg("streamPart", { nullable: true }) streamPart?: string, + @Arg("neighbor", { nullable: true }) neighbor?: NeighborInput, @Arg("pageSize", () => Int, { nullable: true }) pageSize?: number, @Arg("cursor", { nullable: true }) cursor?: string, ): Promise { - return this.repository.getNodes(ids, pageSize, cursor) + const nodesField = info.fieldNodes[0] + const itemsField = nodesField.selectionSet!.selections[0] as FieldNode + const requestedFields = itemsField.selectionSet!.selections + return this.repository.getNodes( + new Set(requestedFields.map((f: any) => f.name.value)), + ids, + (streamPart !== undefined) ? StreamPartIDUtils.parse(streamPart) : undefined, + neighbor, + pageSize, + cursor + ) + } + + @Query(() => Neighbors) + async neighbors( + @Arg("streamPart", { nullable: false }) streamPart: string, + ): Promise { + return this.repository.getNeighbors(StreamPartIDUtils.parse(streamPart)) } } diff --git a/src/entities/Node.ts b/src/entities/Node.ts index 766dcc8..c090409 100644 --- a/src/entities/Node.ts +++ b/src/entities/Node.ts @@ -1,4 +1,4 @@ -import { Field, ObjectType } from 'type-graphql' +import { Field, InputType, ObjectType } from 'type-graphql' /* eslint-disable indent */ @ObjectType() @@ -7,6 +7,18 @@ export class Node { id!: string @Field(() => String, { nullable: true }) ipAddress!: string | null + @Field(() => [StreamPartNeigbors]) + neighbors!: StreamPartNeigbors[] +} + +/* eslint-disable indent */ +@ObjectType() +export class StreamPartNeigbors { + @Field(() => String, { nullable: false }) + streamPartId!: string | null + // TODO could return Node entities? + @Field(() => [String]) + nodeIds!: string[] } /* eslint-disable indent */ @@ -17,3 +29,30 @@ export class Nodes { @Field(() => String, { nullable: true }) cursor!: string | null } + +/* eslint-disable indent */ +@InputType() +export class NeighborInput { + @Field(() => String, { nullable: false }) + node!: string | null + @Field(() => String, { nullable: false }) + streamPart!: string | null +} + +/* eslint-disable indent */ +@ObjectType() +export class Neighbor { + @Field() + nodeId1!: string + @Field() + nodeId2!: string +} + +/* eslint-disable indent */ +@ObjectType() +export class Neighbors { + @Field(() => [Neighbor]) + items!: Neighbor[] + @Field(() => String, { nullable: true }) + cursor!: string | null +} diff --git a/src/repository/NodeRepository.ts b/src/repository/NodeRepository.ts index dcf6fdf..dd85871 100644 --- a/src/repository/NodeRepository.ts +++ b/src/repository/NodeRepository.ts @@ -1,8 +1,10 @@ +import { DhtAddress } from '@streamr/dht' import { Logger } from '@streamr/utils' import { RowDataPacket } from 'mysql2' +import { StreamPartID } from 'streamr-client' import { Inject, Service } from 'typedi' import { Topology } from '../crawler/Topology' -import { Nodes } from '../entities/Node' +import { NeighborInput, Neighbors, Nodes, StreamPartNeigbors } from '../entities/Node' import { createSqlQuery } from '../utils' import { ConnectionPool } from './ConnectionPool' @@ -11,6 +13,12 @@ interface NodeRow extends RowDataPacket { ipAddress: string | null } +interface NeighborRow extends RowDataPacket { + streamPartId: string + nodeId1: string + nodeId2: string +} + const logger = new Logger(module) @Service() @@ -25,33 +33,110 @@ export class NodeRepository { } async getNodes( + requestedFields: Set, ids?: string[], + streamPartId?: StreamPartID, + neighbor?: NeighborInput, pageSize?: number, cursor?: string ): Promise { - logger.info('Query: getNodes', { ids, pageSize, cursor }) + logger.info('Query: getNodes', { ids, streamPartId, neighbor, pageSize, cursor }) const whereClauses = [] const params = [] if (ids !== undefined) { whereClauses.push('id in (?)') params.push(ids) } + if (streamPartId !== undefined) { + whereClauses.push(` + id in (SELECT nodeId1 FROM neighbors WHERE streamPartId = ?) OR + id in (SELECT nodeId2 FROM neighbors WHERE streamPartId = ?) + `) + params.push(streamPartId, streamPartId) + } + if (neighbor !== undefined) { + whereClauses.push(` + id in (SELECT nodeId1 FROM neighbors WHERE nodeId2 = ? AND streamPartId = ?) OR + id in (SELECT nodeId2 FROM neighbors WHERE nodeId1 = ? AND streamPartId = ?) + `) + params.push(neighbor.node, neighbor.streamPart, neighbor.node, neighbor.streamPart) + } const sql = createSqlQuery( `SELECT id, ipAddress FROM nodes`, whereClauses ) - return this.connectionPool.queryPaginated(sql, params) + const rows = await this.connectionPool.queryPaginated(sql, params) + const items: Nodes['items'] = [] + const includeNeighbors = requestedFields.has('neighbors') + for (const row of rows.items) { + items.push({ + ...row, + neighbors: includeNeighbors ? await this.getStreamPartNeighbors(row.id as DhtAddress) : [], + }) + } + return { + items, + cursor: rows.cursor + } + } + + private async getStreamPartNeighbors(id: DhtAddress): Promise { + const rows = await this.connectionPool.queryOrExecute( + `SELECT streamPartId, nodeId1, nodeId2 FROM neighbors WHERE nodeId1 = ? OR nodeId2 = ?`, + [id, id] + ) + const result: StreamPartNeigbors[] = [] + for (const row of rows) { + const otherNode = (row.nodeId1 === id) ? row.nodeId2 : row.nodeId1 + const item = result.find((i) => i.streamPartId === row.streamPartId) + if (item !== undefined) { + item.nodeIds.push(otherNode) + } else { + result.push({ + streamPartId: row.streamPartId, + nodeIds: [otherNode] + }) + } + } + return result + } + + async getNeighbors( + streamPartId: StreamPartID, + ): Promise { + logger.info('Query: getNeighbors', { streamPartId }) + return this.connectionPool.queryPaginated( + `SELECT nodeId1, nodeId2 FROM neighbors WHERE streamPartId = ?`, + [streamPartId] + ) } async replaceNetworkTopology(topology: Topology): Promise { const nodes = topology.getNodes().map((node) => { return [node.id, node.ipAddress] }) + const neighbors: [StreamPartID, DhtAddress, DhtAddress][] = [] + for (const node of topology.getNodes()) { + for (const streamPartId of node.streamPartNeighbors.keys()) { + const streamPartNeighbors = node.streamPartNeighbors.get(streamPartId)! + for (const neighbor of streamPartNeighbors) { + // If node A and B are neighbors, we assume that there are two associations in the topology: + // A->B and B-A. We don't need to store both associations to the DB. The following comparison + // filters out the duplication. Note that if there is only one side of the association + // in the topology, that association is maybe not stored at all. + if (node.id < neighbor) { + neighbors.push([streamPartId, node.id, neighbor]) + } + } + } + } const connection = await this.connectionPool.getConnection() try { await connection.beginTransaction() + await connection.query('DELETE FROM neighbors') await connection.query('DELETE FROM nodes') await connection.query('INSERT INTO nodes (id, ipAddress) VALUES ?', [nodes]) + await connection.query('INSERT INTO neighbors (streamPartId, nodeId1, nodeId2) VALUES ?', [neighbors]) await connection.commit() } catch (e) { connection.rollback() diff --git a/src/utils.ts b/src/utils.ts index fef03e3..5c48373 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -89,7 +89,7 @@ export const queryAPI = async (query: string, port: number): Promise => { export const createSqlQuery = (select: string, whereClauses: string[], orderByExpression?: string): string => { let sql = select if (whereClauses.length > 0) { - sql += ` WHERE ${whereClauses.join(' AND ')}` + sql += ` WHERE ${whereClauses.map((c) => `(${c})`).join(' AND ')}` } if (orderByExpression !== undefined) { sql += ` ORDER BY ${orderByExpression}` diff --git a/test/APIServer.test.ts b/test/APIServer.test.ts index 975612f..1c3a8be 100644 --- a/test/APIServer.test.ts +++ b/test/APIServer.test.ts @@ -9,10 +9,34 @@ import { StreamRepository } from '../src/repository/StreamRepository' import { createDatabase, queryAPI } from '../src/utils' import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils' import { NodeRepository } from '../src/repository/NodeRepository' -import { createRandomDhtAddress } from '@streamr/dht' +import { DhtAddress, createRandomDhtAddress } from '@streamr/dht' import { Multimap } from '@streamr/utils' import { StreamPartIDUtils } from '@streamr/protocol' +const TOPOLOGY_STREAM_PART_ID = StreamPartIDUtils.parse('stream#0') + +const storeTestTopology = async ( + node1: DhtAddress, + node2: DhtAddress +) => { + const nodeRepository = Container.get(NodeRepository) + const streamPartNeighbors1 = new Multimap() + streamPartNeighbors1.add(TOPOLOGY_STREAM_PART_ID, node2) + const streamPartNeighbors2 = new Multimap() + streamPartNeighbors2.add(TOPOLOGY_STREAM_PART_ID, node1) + await nodeRepository.replaceNetworkTopology({ + getNodes: () => [{ + id: node1, + streamPartNeighbors: streamPartNeighbors1, + ipAddress: '' + }, { + id: node2, + streamPartNeighbors: streamPartNeighbors2, + ipAddress: '' + }] + } as any) +} + describe('APIServer', () => { let apiPort: number @@ -243,6 +267,80 @@ describe('APIServer', () => { }]) }) + describe('nodes', () => { + + let node1: DhtAddress + let node2: DhtAddress + + beforeEach(async () => { + node1 = createRandomDhtAddress() + node2 = createRandomDhtAddress() + await storeTestTopology(node1, node2) + }) + + it('ids', async () => { + const response = await queryAPI(`{ + nodes(ids: ["${node1}"]) { + items { + id + ipAddress + neighbors { + streamPartId + nodeIds + } + } + } + }`, apiPort) + const node = response['items'][0] + expect(node).toEqual({ + id: node1, + ipAddress: '', + neighbors: [{ + streamPartId: TOPOLOGY_STREAM_PART_ID, + nodeIds: [node2] + }] + }) + }) + + it('streamPart', async () => { + const response = await queryAPI(`{ + nodes(streamPart: "${TOPOLOGY_STREAM_PART_ID}") { + items { + id + } + } + }`, apiPort) + const nodes = response['items'] + expect(nodes.map((n: any) => n.id)).toIncludeSameMembers([node1, node2]) + }) + + it('neighbors of one node in stream part', async () => { + const response = await queryAPI(`{ + nodes(neighbor: {node: "${node1}", streamPart: "${TOPOLOGY_STREAM_PART_ID}"}) { + items { + id + } + } + }`, apiPort) + const nodes = response['items'] + expect(nodes.map((n: any) => n.id)).toEqual([node2]) + }) + + it('all neighbors in stream part', async () => { + const response = await queryAPI(`{ + neighbors(streamPart: "${TOPOLOGY_STREAM_PART_ID}") { + items { + nodeId1 + nodeId2 + } + } + }`, apiPort) + const neighbors = response['items'] + const actualNodes = [neighbors[0].nodeId1, neighbors[0].nodeId2] + expect(actualNodes).toIncludeSameMembers([node1, node2]) + }) + }) + it('summary', async () => { const streamRepository = Container.get(StreamRepository) await streamRepository.replaceStream({ @@ -261,25 +359,7 @@ describe('APIServer', () => { publisherCount: null, subscriberCount: null }) - const streamPartId = StreamPartIDUtils.parse('stream#0') - const nodeRepository = Container.get(NodeRepository) - const node1 = createRandomDhtAddress() - const node2 = createRandomDhtAddress() - const streamPartNeighbors1 = new Multimap() - streamPartNeighbors1.add(streamPartId, node2) - const streamPartNeighbors2 = new Multimap() - streamPartNeighbors2.add(streamPartId, node1) - await nodeRepository.replaceNetworkTopology({ - getNodes: () => [{ - id: node1, - streamPartNeighbors: streamPartNeighbors1, - ipAddress: '' - }, { - id: node2, - streamPartNeighbors: streamPartNeighbors2, - ipAddress: '' - }] - } as any) + await storeTestTopology(createRandomDhtAddress(), createRandomDhtAddress()) const summary = await queryAPI(`{ summary { streamCount diff --git a/test/end-to-end.test.ts b/test/end-to-end.test.ts index 97e9d30..dff1f6f 100644 --- a/test/end-to-end.test.ts +++ b/test/end-to-end.test.ts @@ -1,9 +1,10 @@ import 'reflect-metadata' -import { NodeType, createRandomDhtAddress, getDhtAddressFromRaw, getRawFromDhtAddress } from '@streamr/dht' +import { DhtAddress, NodeType, createRandomDhtAddress, getDhtAddressFromRaw, getRawFromDhtAddress } from '@streamr/dht' +import { StreamPartID, toStreamPartID } from '@streamr/protocol' import { NetworkNode, createNetworkNode } from '@streamr/trackerless-network' import { setAbortableInterval, waitForCondition } from '@streamr/utils' -import { range, uniq } from 'lodash' +import { random, range, uniq } from 'lodash' import StreamrClient, { CONFIG_TEST, NetworkNodeType, PeerDescriptor, StreamID, StreamPermission, StreamrClientConfig } from 'streamr-client' import Container from 'typedi' import { CONFIG_TOKEN } from '../src/Config' @@ -108,6 +109,18 @@ const queryNodes = async (apiPort: number): Promise => { return response['items'] } +const queryNeighbors = async (nodeId: DhtAddress, streamPartId: StreamPartID, apiPort: number): Promise => { + const query = `{ + nodes(neighbor: {node: "${nodeId}", streamPart: "${streamPartId}"}) { + items { + id + } + } + }` + const response = await queryAPI(query, apiPort) + return response['items'].map((n: any) => n.id) +} + export const nextValue = async (source: AsyncIterator): Promise => { const item = source.next() return (await item).value @@ -213,6 +226,10 @@ describe('end-to-end', () => { ]) expect(uniq(nodes.map((n) => n.ipAddress))).toEqual([DOCKER_DEV_LOOPBACK_IP_ADDRESS]) + const randomActiveStreamPartId = toStreamPartID(existingStream.id, random(ACTIVE_PARTITION_COUNT - 1)) + const neighbors = (await queryNeighbors(await publisher.getNodeId(), randomActiveStreamPartId, apiPort))! + expect(neighbors).toEqual([await subscriber.getNodeId()]) + const newStream = await createTestStream() await startPublisherAndSubscriberForStream(newStream.id, publishingAbortControler.signal)