From e019797a80f5d3737cce919d375b8f88671a924a Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Sat, 17 Feb 2024 01:44:36 +0200 Subject: [PATCH] error handling --- src/crawler/Crawler.ts | 60 ++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/src/crawler/Crawler.ts b/src/crawler/Crawler.ts index 921c3d8..b227f7d 100644 --- a/src/crawler/Crawler.ts +++ b/src/crawler/Crawler.ts @@ -228,35 +228,39 @@ export class Crawler { payload: StreamCreationEvent, localNode: NetworkNodeFacade ): Promise { - logger.info(`New stream ${payload.streamId}`) - // first write some data quickly to the database without analyzing the stream - // - assume no peers and no traffic - // - assume that no explicit permissions have been granted yet (the creator - // is the only publisher and subscriber - await this.database.replaceStream({ - id: payload.streamId, - description: payload.metadata.description ?? null, - peerCount: 0, - messagesPerSecond: 0, - publisherCount: 1, - subscriberCount: 1 - }) - // we wait some time so that The Graph has been indexed the new stream - // and it can provider valid publisher and subscriber counts to us - await wait(this.config.crawler.newStreamAnalysisDelay) - const topology = new Topology([]) - for (const partition of range(payload.metadata.partitions)) { - const streamPartId = toStreamPartID(payload.streamId, partition) - const entryPoints = await localNode.fetchStreamPartEntryPoints(streamPartId) - const streamPartTopology = await crawlTopology(localNode, entryPoints, (nodeInfo: NodeInfo) => { - const streamPartition = nodeInfo.streamPartitions.find((streamPartition) => streamPartition.id === streamPartId) - return (streamPartition !== undefined) - ? streamPartition.deliveryLayerNeighbors - : [] - }, `streamPart-${streamPartId}-${Date.now()}`) - topology.addNodeInfos(streamPartTopology.getNodeInfos()) + try { + logger.info(`New stream ${payload.streamId}`) + // first write some data quickly to the database without analyzing the stream + // - assume no peers and no traffic + // - assume that no explicit permissions have been granted yet (the creator + // is the only publisher and subscriber + await this.database.replaceStream({ + id: payload.streamId, + description: payload.metadata.description ?? null, + peerCount: 0, + messagesPerSecond: 0, + publisherCount: 1, + subscriberCount: 1 + }) + // we wait some time so that The Graph has been indexed the new stream + // and it can provider valid publisher and subscriber counts to us + await wait(this.config.crawler.newStreamAnalysisDelay) + const topology = new Topology([]) + for (const partition of range(payload.metadata.partitions)) { + const streamPartId = toStreamPartID(payload.streamId, partition) + const entryPoints = await localNode.fetchStreamPartEntryPoints(streamPartId) + const streamPartTopology = await crawlTopology(localNode, entryPoints, (nodeInfo: NodeInfo) => { + const streamPartition = nodeInfo.streamPartitions.find((streamPartition) => streamPartition.id === streamPartId) + return (streamPartition !== undefined) + ? streamPartition.deliveryLayerNeighbors + : [] + }, `streamPart-${streamPartId}-${Date.now()}`) + topology.addNodeInfos(streamPartTopology.getNodeInfos()) + } + await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!) + } catch (e: any) { + logger.error(`Failed to handle new stream ${payload.streamId}`, e) } - await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!) } stop(): void {