From dda1e8d4046721a5d0bebaf236a65875e58b086e Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Fri, 12 Apr 2024 16:16:47 +0300 Subject: [PATCH] feat: [NET-1290]: Store sample message (#17) Added new `sampleMessage` endpoint. It returns a sample message from a specified stream. - In each crawl iteration, we store one random message for each stream, replacing any previous sample from earlier iteration - We collect samples from public streams only The sample message can be in JSON or binary format. JSON is returned as a plain string and binary as base64-encoded string. --- initialize-database.sql | 6 +++ src/api/APIServer.ts | 3 +- src/api/MessageResolver.ts | 36 ++++++++++++++ src/crawler/Crawler.ts | 18 ++++++- src/crawler/messageRate.ts | 10 +++- src/entities/Message.ts | 15 ++++++ src/repository/MessageRepository.ts | 67 ++++++++++++++++++++++++++ test/APIServer.test.ts | 58 +++++++++++++++++++++- test/MessageRepository.test.ts | 71 +++++++++++++++++++++++++++ test/end-to-end.test.ts | 74 +++++++++++++++++++++-------- test/messageRate.test.ts | 6 ++- 11 files changed, 336 insertions(+), 28 deletions(-) create mode 100644 src/api/MessageResolver.ts create mode 100644 src/entities/Message.ts create mode 100644 src/repository/MessageRepository.ts create mode 100644 test/MessageRepository.test.ts diff --git a/initialize-database.sql b/initialize-database.sql index efbff8f..fdb2175 100755 --- a/initialize-database.sql +++ b/initialize-database.sql @@ -15,6 +15,12 @@ CREATE TABLE IF NOT EXISTS streams ( INDEX streams_subscriberCount (subscriberCount) ); +CREATE TABLE IF NOT EXISTS sample_messages ( + streamId VARCHAR(500) NOT NULL PRIMARY KEY, + content MEDIUMBLOB NOT NULL, + contentType VARCHAR(50) NOT NULL +); + CREATE TABLE IF NOT EXISTS nodes ( id CHAR(40) NOT NULL PRIMARY KEY, ipAddress VARCHAR(15) diff --git a/src/api/APIServer.ts b/src/api/APIServer.ts index 2dfc959..62f3e7d 100644 --- a/src/api/APIServer.ts +++ b/src/api/APIServer.ts @@ -11,6 +11,7 @@ import { Config, CONFIG_TOKEN } from '../Config' import { StreamResolver } from './StreamResolver' import { SummaryResolver } from './SummaryResolver' import { NodeResolver } from './NodeResolver' +import { MessageResolver } from './MessageResolver' const logger = new Logger(module) @@ -30,7 +31,7 @@ export class APIServer { async start(): Promise { const schema = await buildSchema({ - resolvers: [StreamResolver, NodeResolver, SummaryResolver], + resolvers: [StreamResolver, MessageResolver, NodeResolver, SummaryResolver], container: Container, validate: false }) diff --git a/src/api/MessageResolver.ts b/src/api/MessageResolver.ts new file mode 100644 index 0000000..b615e7c --- /dev/null +++ b/src/api/MessageResolver.ts @@ -0,0 +1,36 @@ +import { Arg, Query, Resolver } from 'type-graphql' +import { Inject, Service } from 'typedi' +import { ContentType, Message } from '../entities/Message' +import { MessageRepository } from '../repository/MessageRepository' +import { toStreamID } from '@streamr/protocol' +import { binaryToUtf8 } from '@streamr/utils' + +@Resolver() +@Service() +export class MessageResolver { + + private repository: MessageRepository + + constructor( + @Inject() repository: MessageRepository + ) { + this.repository = repository + } + + @Query(() => Message, { nullable: true }) + async sampleMessage( + @Arg("stream", { nullable: false }) streamId: string + ): Promise { + const message = await this.repository.getSampleMessage(toStreamID(streamId)) + if (message !== null) { + return { + content: (message.contentType === ContentType.JSON) + ? binaryToUtf8(message.content) + : Buffer.from(message.content).toString('base64'), + contentType: message.contentType + } + } else { + return null + } + } +} diff --git a/src/crawler/Crawler.ts b/src/crawler/Crawler.ts index 28070ab..9a55f6a 100644 --- a/src/crawler/Crawler.ts +++ b/src/crawler/Crawler.ts @@ -15,6 +15,7 @@ import { NetworkNodeFacade } from './NetworkNodeFacade' import { MAX_SUBSCRIPTION_COUNT, SubscribeGate } from './SubscribeGate' import { Topology } from './Topology' import { getMessageRate } from './messageRate' +import { MessageRepository, convertStreamMessageToMessageRow } from '../repository/MessageRepository' const logger = new Logger(module) @@ -97,10 +98,15 @@ export const crawlTopology = async ( return new Topology([...nodeInfos.values()]) } +const isPublicStream = (subscriberCount: number | null) => { + return subscriberCount === null +} + @Service() export class Crawler { private readonly streamRepository: StreamRepository + private readonly messageRepository: MessageRepository private readonly nodeRepository: NodeRepository private readonly client: StreamrClientFacade private readonly config: Config @@ -109,11 +115,13 @@ export class Crawler { constructor( @Inject() streamRepository: StreamRepository, + @Inject() messageRepository: MessageRepository, @Inject() nodeRepository: NodeRepository, @Inject() client: StreamrClientFacade, @Inject(CONFIG_TOKEN) config: Config ) { this.streamRepository = streamRepository + this.messageRepository = messageRepository this.nodeRepository = nodeRepository this.client = client this.config = config @@ -190,18 +198,20 @@ export class Crawler { peersByPartition.set(partition, topology.getPeers(toStreamPartID(id, partition))) } try { + const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH) + const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE) const peerIds = new Set(...peersByPartition.values()) const messageRate = (peerIds.size > 0) ? await getMessageRate( id, [...peersByPartition.keys()], + isPublicStream(subscriberCount), await this.client.getNetworkNodeFacade(), subscribeGate, this.config ) : { messagesPerSecond: 0, bytesPerSecond: 0 } - const publisherCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.PUBLISH) - const subscriberCount = await this.client.getPublisherOrSubscriberCount(id, StreamPermission.SUBSCRIBE) + logger.info(`Replace ${id}`) await this.streamRepository.replaceStream({ id, @@ -212,6 +222,10 @@ export class Crawler { publisherCount, subscriberCount }) + const sampleMessage = (messageRate.sampleMessage !== undefined) + ? convertStreamMessageToMessageRow(messageRate.sampleMessage) + : null + await this.messageRepository.replaceSampleMessage(sampleMessage, id) } catch (e: any) { logger.error(`Failed to analyze ${id}`, e) } diff --git a/src/crawler/messageRate.ts b/src/crawler/messageRate.ts index d1f68d5..d87b69c 100644 --- a/src/crawler/messageRate.ts +++ b/src/crawler/messageRate.ts @@ -10,25 +10,32 @@ const logger = new Logger(module) // If there are many partitions, we approximate the message rate of a stream by analyzing only some of the partitions. // We assume that traffic levels in each partitions are be quite similar. export const MAX_PARTITION_COUNT = 10 +const MAX_MESSAGE_SIZE = 1048576 export interface MessageRate { messagesPerSecond: number bytesPerSecond: number + sampleMessage?: StreamMessage } export const getMessageRate = async ( streamId: StreamID, activePartitions: number[], + isPublicStream: boolean, node: NetworkNodeFacade, subscibeGate: Gate, config: Config ): Promise => { let messageCount = 0 let bytesSum = 0 + let sampleMessage: StreamMessage | undefined = undefined const messageListener = (msg: StreamMessage) => { if (msg.getStreamId() === streamId) { messageCount++ bytesSum += msg.content.length + if ((sampleMessage === undefined) && isPublicStream && (msg.content.length <= MAX_MESSAGE_SIZE)) { + sampleMessage = msg + } } } node.addMessageListener(messageListener) @@ -53,7 +60,8 @@ export const getMessageRate = async ( } const rate = { messagesPerSecond: calculateRate(messageCount), - bytesPerSecond: calculateRate(bytesSum) + bytesPerSecond: calculateRate(bytesSum), + sampleMessage } // eslint-disable-next-line max-len logger.info(`Message rate ${streamId}: messagesPerSecond=${rate.messagesPerSecond}, bytesPerSecond=${rate.bytesPerSecond}`, { messageCount, samplePartitions, activePartitions }) diff --git a/src/entities/Message.ts b/src/entities/Message.ts new file mode 100644 index 0000000..a4c0a82 --- /dev/null +++ b/src/entities/Message.ts @@ -0,0 +1,15 @@ +import { Field, ObjectType } from 'type-graphql' + +export enum ContentType { + JSON = 'JSON', + BINARY = 'BINARY' +} + +/* eslint-disable indent */ +@ObjectType() +export class Message { + @Field(() => String, { description: 'JSON string if contentType is JSON, otherwise base64-encoded binary content' }) + content!: string + @Field() + contentType!: ContentType +} diff --git a/src/repository/MessageRepository.ts b/src/repository/MessageRepository.ts new file mode 100644 index 0000000..507cd1b --- /dev/null +++ b/src/repository/MessageRepository.ts @@ -0,0 +1,67 @@ +import { Inject, Service } from 'typedi' +import { ConnectionPool } from './ConnectionPool' +import { StreamID } from '@streamr/protocol' +import { ContentType } from '../entities/Message' +import { StreamMessage, ContentType as StreamMessageContentType } from '@streamr/protocol' + +export interface MessageRow { + content: Uint8Array + contentType: ContentType +} + +export const convertStreamMessageToMessageRow = (msg: StreamMessage): MessageRow => { + let contentType + if (msg.contentType === StreamMessageContentType.JSON) { + contentType = ContentType.JSON + } else if (msg.contentType === StreamMessageContentType.BINARY) { + contentType = ContentType.BINARY + } else { + throw new Error(`Assertion failed: unknown content type ${msg.contentType}`) + } + return { + content: msg.content, + contentType + } +} + +@Service() +export class MessageRepository { + + private readonly connectionPool: ConnectionPool + + constructor( + @Inject() connectionPool: ConnectionPool + ) { + this.connectionPool = connectionPool + } + + async getSampleMessage(streamId: StreamID): Promise { + const rows = await this.connectionPool.queryOrExecute( + 'SELECT content, contentType FROM sample_messages WHERE streamId=? LIMIT 1', + [streamId] + ) + if (rows.length === 1) { + return rows[0] + } else { + return null + } + } + + async replaceSampleMessage(message: MessageRow | null, streamId: StreamID): Promise { + if (message !== null) { + await this.connectionPool.queryOrExecute( + 'REPLACE INTO sample_messages (streamId, content, contentType) VALUES (?, ?, ?)', + [ + streamId, + Buffer.from(message.content), + message.contentType + ] + ) + } else { + await this.connectionPool.queryOrExecute( + 'DELETE FROM sample_messages WHERE streamId=?', + [streamId] + ) + } + } +} diff --git a/test/APIServer.test.ts b/test/APIServer.test.ts index 8360e66..e61d649 100644 --- a/test/APIServer.test.ts +++ b/test/APIServer.test.ts @@ -10,8 +10,11 @@ import { createDatabase, queryAPI } from '../src/utils' import { dropTestDatabaseIfExists, TEST_DATABASE_NAME } from './utils' import { NodeRepository } from '../src/repository/NodeRepository' import { DhtAddress, createRandomDhtAddress } from '@streamr/dht' -import { Multimap } from '@streamr/utils' +import { Multimap, utf8ToBinary } from '@streamr/utils' import { StreamPartID, StreamPartIDUtils } from '@streamr/protocol' +import { MessageRepository } from '../src/repository/MessageRepository' +import { ContentType } from '../src/entities/Message' +import { StreamID } from '@streamr/protocol' const storeTestTopology = async ( streamParts: { @@ -282,6 +285,59 @@ describe('APIServer', () => { }]) }) + describe('sampleMessage', () => { + + it('JSON', async () => { + const streamId = `stream-${Date.now()}` as StreamID + const content = { foo: 'bar' } + const repository = Container.get(MessageRepository) + await repository.replaceSampleMessage({ + content: utf8ToBinary(JSON.stringify(content)), + contentType: ContentType.JSON + }, streamId) + const sample = await queryAPI(`{ + sampleMessage(stream: "${streamId}") { + content + contentType + } + }`, apiPort) + expect(sample).toEqual({ + content: JSON.stringify(content), + contentType: 'JSON' + }) + }) + + it('binary', async () => { + const streamId = `stream-${Date.now()}` as StreamID + const repository = Container.get(MessageRepository) + await repository.replaceSampleMessage({ + content: new Uint8Array([1, 2, 3, 4]), + contentType: ContentType.BINARY + }, streamId) + const sample = await queryAPI(`{ + sampleMessage(stream: "${streamId}") { + content + contentType + } + }`, apiPort) + expect(sample).toEqual({ + content: 'AQIDBA==', + contentType: 'BINARY' + }) + }) + + it('not found', async () => { + const streamId = `stream-${Date.now()}` as StreamID + const sample = await queryAPI(`{ + sampleMessage(stream: "${streamId}") { + content + contentType + } + }`, apiPort) + expect(sample).toBeNull() + }) + }) + describe('nodes', () => { const node1 = createRandomDhtAddress() diff --git a/test/MessageRepository.test.ts b/test/MessageRepository.test.ts new file mode 100644 index 0000000..9088175 --- /dev/null +++ b/test/MessageRepository.test.ts @@ -0,0 +1,71 @@ +import 'reflect-metadata' + +import Container from 'typedi' +import { CONFIG_TOKEN } from '../src/Config' +import { createDatabase } from '../src/utils' +import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils' +import { StreamID } from '@streamr/protocol' +import { MessageRepository, MessageRow } from '../src/repository/MessageRepository' +import { utf8ToBinary } from '@streamr/utils' +import { ContentType } from '../src/entities/Message' + +const createTestMessage = (msg: { content: Uint8Array, contentType: ContentType }): MessageRow => { + return { + // normalize content to Buffer so that we can compare instances with expect().toEqual() + content: Buffer.from(msg.content), + contentType: msg.contentType + } +} + +describe('MessageRepository', () => { + + beforeEach(async () => { + const config = { + database: { + host: '10.200.10.1', + name: TEST_DATABASE_NAME, + user: 'root', + password: 'password' + } + } + await dropTestDatabaseIfExists(config.database) + await createDatabase(config.database) + Container.set(CONFIG_TOKEN, config) + }) + + afterEach(() => { + Container.reset() + }) + + it('create, update, remove', async () => { + const streamId = `stream-${Date.now()}` as StreamID + const otherStreamId = `other-stream-${Date.now()}` as StreamID + const repository = Container.get(MessageRepository) + + // create + const sample1 = createTestMessage({ + content: utf8ToBinary('stream-mock-json-content'), + contentType: ContentType.JSON + }) + await repository.replaceSampleMessage(sample1, streamId) + const otherSample = createTestMessage({ + content: utf8ToBinary('other-stream-mock-json-content'), + contentType: ContentType.JSON + }) + await repository.replaceSampleMessage(otherSample, otherStreamId) + expect(await repository.getSampleMessage(streamId)).toEqual(sample1) + + // update + const sample2 = createTestMessage({ + content: new Uint8Array([1, 2, 3]), + contentType: ContentType.BINARY + }) + await repository.replaceSampleMessage(sample2, streamId) + expect(await repository.getSampleMessage(streamId)).toEqual(sample2) + + // delete + await repository.replaceSampleMessage(null, streamId) + expect(await repository.getSampleMessage(streamId)).toBeNull() + expect(await repository.getSampleMessage(otherStreamId)).toEqual(otherSample) + }) +}) diff --git a/test/end-to-end.test.ts b/test/end-to-end.test.ts index 45b778c..8f056ef 100644 --- a/test/end-to-end.test.ts +++ b/test/end-to-end.test.ts @@ -14,6 +14,7 @@ import { Node } from '../src/entities/Node' import { Stream } from '../src/entities/Stream' import { createDatabase, queryAPI } from '../src/utils' import { TEST_DATABASE_NAME, dropTestDatabaseIfExists } from './utils' +import { Message } from '../src/entities/Message' const PUBLISHER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000001' const SUBSCRIBER_PRIVATE_KEY = '0x0000000000000000000000000000000000000000000000000000000000000002' @@ -97,6 +98,17 @@ const queryStreamMetrics = async (id: string, apiPort: number): Promise => { + const query = `{ + sampleMessage(stream: "${streamId}") { + content + contentType + } + }` + const response = await queryAPI(query, apiPort) + return response ?? undefined +} + const queryNodes = async (apiPort: number): Promise => { const query = `{ nodes { @@ -137,28 +149,30 @@ describe('end-to-end', () => { let crawler: Crawler let apiPort: number - const createTestStream = async () => { + const createTestStream = async (isPublic: boolean) => { const stream = await publisher.createStream({ id: `/test/stream-metrics-index/${Date.now()}`, partitions: PARTITION_COUNT, description: 'mock-description' }) - await stream.grantPermissions({ - user: await subscriber.getAddress(), - permissions: [StreamPermission.SUBSCRIBE] - }) + const permissions = [StreamPermission.SUBSCRIBE] + if (isPublic) { + await stream.grantPermissions({ public: true, permissions }) + } else { + await stream.grantPermissions({ user: await subscriber.getAddress(), permissions }) + } return stream } const startPublisherAndSubscriberForStream = async (streamId: StreamID, publishingAbortControler: AbortSignal) => { - return Promise.all(range(ACTIVE_PARTITION_COUNT).map(async (partition) => { + return Promise.all(range(ACTIVE_PARTITION_COUNT).map(async (partition: number) => { const streamPartDefinition = { streamId: streamId, partition } const subscription = await subscriber.subscribe(streamPartDefinition) setAbortableInterval(async () => { - await publisher.publish(streamPartDefinition, { foo: Date.now() }) + await publisher.publish(streamPartDefinition, { foo: 'bar' }) }, 500, publishingAbortControler) // wait until publisher and subscriber are connected const iterator = subscription[Symbol.asyncIterator]() @@ -206,14 +220,16 @@ describe('end-to-end', () => { it('happy path', async () => { const publishingAbortControler = new AbortController() - const existingStream = await createTestStream() - await startPublisherAndSubscriberForStream(existingStream.id, publishingAbortControler.signal) + const privateStream = await createTestStream(false) + await startPublisherAndSubscriberForStream(privateStream.id, publishingAbortControler.signal) + const publicStream = await createTestStream(true) + await startPublisherAndSubscriberForStream(publicStream.id, publishingAbortControler.signal) crawler = Container.get(Crawler) await crawler.start(1) - const streamMetrics1 = (await queryStreamMetrics(existingStream.id, apiPort))! - expect(streamMetrics1.id).toBe(existingStream.id) + const streamMetrics1 = (await queryStreamMetrics(privateStream.id, apiPort))! + expect(streamMetrics1.id).toBe(privateStream.id) expect(streamMetrics1.description).toBe('mock-description') expect(streamMetrics1.peerCount).toBe(2) expect(streamMetrics1.messagesPerSecond).toBeGreaterThan(0) @@ -221,6 +237,22 @@ describe('end-to-end', () => { expect(streamMetrics1.publisherCount).toBe(1) expect(streamMetrics1.subscriberCount).toBe(2) + const sampleMessage1 = (await querySampleMessage(privateStream.id, apiPort)) + expect(sampleMessage1).toBeUndefined() + + const streamMetrics2 = (await queryStreamMetrics(publicStream.id, apiPort))! + expect(streamMetrics2.id).toBe(publicStream.id) + expect(streamMetrics2.description).toBe('mock-description') + expect(streamMetrics2.peerCount).toBe(2) + expect(streamMetrics2.messagesPerSecond).toBeGreaterThan(0) + expect(streamMetrics2.bytesPerSecond).toBeGreaterThan(0) + expect(streamMetrics2.publisherCount).toBe(1) + expect(streamMetrics2.subscriberCount).toBe(null) + + const sampleMessage2 = (await querySampleMessage(publicStream.id, apiPort))! + expect(sampleMessage2.content).toEqual('{"foo":"bar"}') + expect(sampleMessage2.contentType).toEqual('JSON') + const nodes = (await queryNodes(apiPort))! expect(nodes.map((n) => n.id)).toIncludeSameMembers([ await publisher.getNodeId(), @@ -230,25 +262,25 @@ describe('end-to-end', () => { ]) expect(uniq(nodes.map((n) => n.ipAddress))).toEqual([DOCKER_DEV_LOOPBACK_IP_ADDRESS]) - const randomActiveStreamPartId = toStreamPartID(existingStream.id, random(ACTIVE_PARTITION_COUNT - 1)) + const randomActiveStreamPartId = toStreamPartID(privateStream.id, random(ACTIVE_PARTITION_COUNT - 1)) const neighbors = (await queryNeighbors(await publisher.getNodeId(), randomActiveStreamPartId, apiPort))! expect(neighbors).toEqual([await subscriber.getNodeId()]) - const newStream = await createTestStream() + const newStream = await createTestStream(false) await startPublisherAndSubscriberForStream(newStream.id, publishingAbortControler.signal) await waitForCondition(async () => { const metrics = await queryStreamMetrics(newStream.id, apiPort) return (metrics !== undefined) && (metrics.peerCount >= 2) }, 20 * 1000, 1000) - const streamMetrics2 = (await queryStreamMetrics(newStream.id, apiPort))! - expect(streamMetrics2.id).toBe(newStream.id) - expect(streamMetrics2.description).toBe('mock-description') - expect(streamMetrics2.peerCount).toBe(2) - expect(streamMetrics2.messagesPerSecond).toBeGreaterThan(0) - expect(streamMetrics2.bytesPerSecond).toBeGreaterThan(0) - expect(streamMetrics2.publisherCount).toBe(1) - expect(streamMetrics2.subscriberCount).toBe(2) + const streamMetrics3 = (await queryStreamMetrics(newStream.id, apiPort))! + expect(streamMetrics3.id).toBe(newStream.id) + expect(streamMetrics3.description).toBe('mock-description') + expect(streamMetrics3.peerCount).toBe(2) + expect(streamMetrics3.messagesPerSecond).toBeGreaterThan(0) + expect(streamMetrics3.bytesPerSecond).toBeGreaterThan(0) + expect(streamMetrics3.publisherCount).toBe(1) + expect(streamMetrics3.subscriberCount).toBe(2) publishingAbortControler.abort() crawler.stop() diff --git a/test/messageRate.test.ts b/test/messageRate.test.ts index 46bde80..9cbf68b 100644 --- a/test/messageRate.test.ts +++ b/test/messageRate.test.ts @@ -35,13 +35,14 @@ const createMockNode = (): any => { describe('messageRate', () => { it('happy path', async () => { const node = createMockNode() - const actual = await getMessageRate(STREAM_ID, [1, 4, 5], node, new Gate(true), { + const actual = await getMessageRate(STREAM_ID, [1, 4, 5], true, node, new Gate(true), { crawler: { subscribeDuration: 200 } } as any) expect(actual.messagesPerSecond).toEqual(15) expect(actual.bytesPerSecond).toEqual(1500) + expect(actual.sampleMessage).toBeDefined() expect(node.subscribe).toBeCalledTimes(3) expect(node.subscribe.mock.calls.flat().sort()).toEqual([ toStreamPartID(STREAM_ID, 1), @@ -54,13 +55,14 @@ describe('messageRate', () => { const partitionMultiplier = 4 const partitions = range(MAX_PARTITION_COUNT * partitionMultiplier) const node = createMockNode() - const actual = await getMessageRate(STREAM_ID, partitions, node, new Gate(true), { + const actual = await getMessageRate(STREAM_ID, partitions, true, node, new Gate(true), { crawler: { subscribeDuration: 200 } } as any) expect(actual.messagesPerSecond).toEqual(15 * partitionMultiplier) expect(actual.bytesPerSecond).toEqual(1500 * partitionMultiplier) + expect(actual.sampleMessage).toBeDefined() expect(node.subscribe).toBeCalledTimes(MAX_PARTITION_COUNT) }) })