Skip to content

Commit

Permalink
error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
teogeb committed Feb 16, 2024
1 parent e25b333 commit e019797
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions src/crawler/Crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,35 +228,39 @@ export class Crawler {
payload: StreamCreationEvent,
localNode: NetworkNodeFacade
): Promise<void> {
logger.info(`New stream ${payload.streamId}`)
// first write some data quickly to the database without analyzing the stream
// - assume no peers and no traffic
// - assume that no explicit permissions have been granted yet (the creator
// is the only publisher and subscriber
await this.database.replaceStream({
id: payload.streamId,
description: payload.metadata.description ?? null,
peerCount: 0,
messagesPerSecond: 0,
publisherCount: 1,
subscriberCount: 1
})
// we wait some time so that The Graph has been indexed the new stream
// and it can provider valid publisher and subscriber counts to us
await wait(this.config.crawler.newStreamAnalysisDelay)
const topology = new Topology([])
for (const partition of range(payload.metadata.partitions)) {
const streamPartId = toStreamPartID(payload.streamId, partition)
const entryPoints = await localNode.fetchStreamPartEntryPoints(streamPartId)
const streamPartTopology = await crawlTopology(localNode, entryPoints, (nodeInfo: NodeInfo) => {
const streamPartition = nodeInfo.streamPartitions.find((streamPartition) => streamPartition.id === streamPartId)
return (streamPartition !== undefined)
? streamPartition.deliveryLayerNeighbors
: []
}, `streamPart-${streamPartId}-${Date.now()}`)
topology.addNodeInfos(streamPartTopology.getNodeInfos())
try {
logger.info(`New stream ${payload.streamId}`)
// first write some data quickly to the database without analyzing the stream
// - assume no peers and no traffic
// - assume that no explicit permissions have been granted yet (the creator
// is the only publisher and subscriber
await this.database.replaceStream({
id: payload.streamId,
description: payload.metadata.description ?? null,
peerCount: 0,
messagesPerSecond: 0,
publisherCount: 1,
subscriberCount: 1
})
// we wait some time so that The Graph has been indexed the new stream
// and it can provider valid publisher and subscriber counts to us
await wait(this.config.crawler.newStreamAnalysisDelay)
const topology = new Topology([])
for (const partition of range(payload.metadata.partitions)) {
const streamPartId = toStreamPartID(payload.streamId, partition)
const entryPoints = await localNode.fetchStreamPartEntryPoints(streamPartId)
const streamPartTopology = await crawlTopology(localNode, entryPoints, (nodeInfo: NodeInfo) => {
const streamPartition = nodeInfo.streamPartitions.find((streamPartition) => streamPartition.id === streamPartId)
return (streamPartition !== undefined)
? streamPartition.deliveryLayerNeighbors
: []
}, `streamPart-${streamPartId}-${Date.now()}`)
topology.addNodeInfos(streamPartTopology.getNodeInfos())
}
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
} catch (e: any) {
logger.error(`Failed to handle new stream ${payload.streamId}`, e)
}
await this.analyzeStream(payload.streamId, payload.metadata, topology, this.subscribeGate!)
}

stop(): void {
Expand Down

0 comments on commit e019797

Please sign in to comment.