Skip to content

Commit

Permalink
refactor: Upgrade snapshot fetcher and refactor deployer into smaller…
Browse files Browse the repository at this point in the history
… components (#516)

* chore: Bump Snapshot Fetcher and Catalyst Storage

* refactor: Rename fns after major updates

* refactor: Abstract sns publication in publisher component

* refactor: Abstract download entity process in a new port

* feat: Add counter metrics for message published, downloads, and deployments

* refactor: Rename fn + concurrent publishing

* test: Tests new adapters and utils

* Update src/adapters/sns/publisher.ts

Co-authored-by: Alejo Thomas Ortega <[email protected]>
Signed-off-by: Kevin Szuchet <[email protected]>

* Update test/unit/adapters/deployer.spec.ts

Co-authored-by: Alejo Thomas Ortega <[email protected]>
Signed-off-by: Kevin Szuchet <[email protected]>

* test: fix test after removing export

---------

Signed-off-by: Kevin Szuchet <[email protected]>
Co-authored-by: Alejo Thomas Ortega <[email protected]>
  • Loading branch information
kevinszuchet and aleortega authored Jan 2, 2025
1 parent f072ccb commit ea08ce4
Show file tree
Hide file tree
Showing 21 changed files with 1,898 additions and 1,306 deletions.
1 change: 1 addition & 0 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@

AWS_REGION=us-east-1
SNS_ARN=
EVENTS_SNS_ARN=
HTTP_SERVER_PORT=3000
HTTP_SERVER_HOST=0.0.0.0
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ dist/
node_modules/
**/.DS_Store
coverage
content
content
.env
9 changes: 3 additions & 6 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
module.exports = {
globals: {
"ts-jest": {
tsconfig: "test/tsconfig.json",
},
},
moduleFileExtensions: ["ts", "js"],
transform: {
"^.+\\.(ts|tsx)$": "ts-jest",
"^.+\\.(ts|tsx)$": ["ts-jest", { tsconfig: 'test/tsconfig.json' }]
},
coverageDirectory: "coverage",
collectCoverageFrom: ["src/**/*.ts", "src/**/*.js"],
coveragePathIgnorePatterns: ['/node_modules/', '/src/index.ts'],
testMatch: ["**/*.spec.(ts)"],
testEnvironment: "node",
setupFilesAfterEnv: ['<rootDir>/test/setupTests.ts']
}
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
},
"dependencies": {
"@aws-sdk/client-sns": "^3.616.0",
"@dcl/catalyst-storage": "^2.0.3",
"@dcl/schemas": "^15.1.2",
"@dcl/snapshots-fetcher": "^5.0.4",
"@dcl/catalyst-storage": "^4.3.0",
"@dcl/schemas": "^15.3.3",
"@dcl/snapshots-fetcher": "^9.1.0",
"@well-known-components/env-config-provider": "^1.1.1",
"@well-known-components/http-server": "^1.1.1",
"@well-known-components/interfaces": "^1.2.0",
Expand Down
186 changes: 76 additions & 110 deletions src/adapters/deployer/index.ts
Original file line number Diff line number Diff line change
@@ -1,151 +1,117 @@
import { downloadEntityAndContentFiles } from '@dcl/snapshots-fetcher'
import { IDeployerComponent } from '@dcl/snapshots-fetcher/dist/types'
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns'
import { AppComponents } from '../../types'
import { DeploymentToSqs } from '@dcl/schemas/dist/misc/deployments-to-sqs'
import { Events } from '@dcl/schemas/dist/platform/events'
import { DeployableEntity, IDeployerComponent, TimeRange } from '@dcl/snapshots-fetcher/dist/types'
import { AppComponents, EntityDownloadError, SnsPublisherComponent } from '../../types'

export function createDeployerComponent(
components: Pick<AppComponents, 'logs' | 'storage' | 'downloadQueue' | 'fetch' | 'metrics' | 'sns'>
components: Pick<
AppComponents,
| 'logs'
| 'storage'
| 'downloadQueue'
| 'fetch'
| 'metrics'
| 'snsPublisher'
| 'snsEventPublisher'
| 'entityDownloader'
>
): IDeployerComponent {
const logger = components.logs.getLogger('downloader')
const logger = components.logs.getLogger('Deployer')

const client = new SNSClient({
endpoint: components.sns.optionalSnsEndpoint ? components.sns.optionalSnsEndpoint : undefined
})
async function publishDeploymentNotifications(entity: DeployableEntity, servers: string[]) {
const { snsPublisher, snsEventPublisher } = components

const shouldSendEntityToSns = ['scene', 'wearable', 'emote'].includes(entity.entityType)

const publishers = [shouldSendEntityToSns && snsPublisher, snsEventPublisher]
.filter((publisher): publisher is SnsPublisherComponent => !!publisher)
.map(async (publisher) => await publisher.publishMessage(entity, servers))

await Promise.all(publishers)
}

return {
async deployEntity(entity, servers) {
const markAsDeployed = entity.markAsDeployed ? entity.markAsDeployed : async () => {}
try {
const exists = await components.storage.exist(entity.entityId)
async scheduleEntityDeployment(entity, servers) {
logger.debug('Scheduling entity deployment', {
entityId: entity.entityId,
entityType: entity.entityType
})

const isSnsEntityToSend =
(entity.entityType === 'scene' || entity.entityType === 'wearable' || entity.entityType === 'emote') &&
!!components.sns.arn
const markAsDeployed = entity.markAsDeployed || (async () => {})

const isSnsEventToSend = !!components.sns.eventArn
components.metrics.increment('schedule_entity_deployment_attempt', {
entityType: entity.entityType
})

logger.debug('Handling entity', {
entityId: entity.entityId,
entityType: entity.entityType,
exists: exists ? 'true' : 'false',
isSnsEntityToSend: isSnsEntityToSend ? 'true' : 'false',
isSnsEventToSend: isSnsEventToSend ? 'true' : 'false'
})
try {
const exists = await components.storage.exist(entity.entityId)

if (exists) {
logger.debug('Entity already stored', {
entityId: entity.entityId,
entityType: entity.entityType
})
components.metrics.increment('entity_already_stored', {
entityType: entity.entityType
})
return await markAsDeployed()
}

await components.downloadQueue.onSizeLessThan(1000)

void components.downloadQueue.scheduleJob(async () => {
logger.info('Downloading entity', {
entityId: entity.entityId,
entityType: entity.entityType,
servers: servers.join(',')
})

try {
await downloadEntityAndContentFiles(
{ ...components, fetcher: components.fetch },
entity.entityId,
servers,
new Map(),
'content',
10,
1000
)
} catch (error: any) {
logger.error('Failed to download entity', {
entityId: entity.entityId,
entityType: entity.entityType,
errorMessage: error.message
})
await components.entityDownloader.downloadEntity(entity, servers)

const match = error.message?.match(/status: 4\d{2}/)
await publishDeploymentNotifications(entity, servers)

if (match) {
await markAsDeployed()
}
await markAsDeployed()

return
}
components.metrics.increment('entity_deployment_success', {
entityType: entity.entityType
})
} catch (error: any) {
if (error instanceof EntityDownloadError) {
return
}

logger.info('Entity stored', { entityId: entity.entityId, entityType: entity.entityType })
const isNotRetryable = /status: 4\d{2}/.test(error.message)

const deploymentToSqs: DeploymentToSqs = {
entity,
contentServerUrls: servers
}

// send sns
if (isSnsEntityToSend) {
const receipt = await client.send(
new PublishCommand({
TopicArn: components.sns.arn,
Message: JSON.stringify(deploymentToSqs),
MessageAttributes: {
type: { DataType: 'String', StringValue: Events.Type.CATALYST_DEPLOYMENT },
subType: { DataType: 'String', StringValue: entity.entityType as Events.SubType.CatalystDeployment }
}
})
)
logger.info('Notification sent', {
messageId: receipt.MessageId as any,
sequenceNumber: receipt.SequenceNumber as any,
logger.error('Failed to publish entity', {
entityId: entity.entityId,
entityType: entity.entityType
entityType: entity.entityType,
error: error?.message,
stack: error?.stack
})
}

if (isSnsEventToSend) {
// TODO: this should be a CatalystDeploymentEvent
const deploymentEvent = {
type: Events.Type.CATALYST_DEPLOYMENT,
subType: entity.entityType as Events.SubType.CatalystDeployment,
...deploymentToSqs
} as any

const receipt = await client.send(
new PublishCommand({
TopicArn: components.sns.eventArn,
Message: JSON.stringify(deploymentEvent),
MessageAttributes: {
type: { DataType: 'String', StringValue: deploymentEvent.type },
subType: { DataType: 'String', StringValue: deploymentEvent.subType }
}
})
)
logger.info('Notification sent to events SNS', {
MessageId: receipt.MessageId as any,
SequenceNumber: receipt.SequenceNumber as any,
entityId: entity.entityId,
components.metrics.increment('entity_deployment_failure', {
retryable: isNotRetryable ? 'false' : 'true',
entityType: entity.entityType
})

if (isNotRetryable) {
logger.error('Failed to download entity', {
entityId: entity.entityId,
entityType: entity.entityType,
error: error?.message
})

await markAsDeployed()
}
}
await markAsDeployed()
})
} catch (error: any) {
const isNotRetryable = /status: 4\d{2}/.test(error.message)
logger.error('Failed to publish entity', {
logger.error('Failed to schedule entity deployment', {
entityId: entity.entityId,
entityType: entity.entityType,
error: error?.message,
stack: error?.stack
})

if (isNotRetryable) {
logger.error('Failed to download entity', {
entityId: entity.entityId,
entityType: entity.entityType,
error: error?.message
})
await markAsDeployed()
}
components.metrics.increment('entity_deployment_failure', {
entityType: entity.entityType
})
}
},
async onIdle() {}
async onIdle() {},
async prepareForDeploymentsIn(_timeRanges: TimeRange[]) {}
}
}
58 changes: 58 additions & 0 deletions src/adapters/entity-downloader/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { downloadEntityAndContentFiles } from '@dcl/snapshots-fetcher'
import { DeployableEntity } from '@dcl/snapshots-fetcher/dist/types'
import { AppComponents, EntityDownloaderComponent, EntityDownloadError } from '../../types'

export async function createEntityDownloaderComponent(
components: Pick<AppComponents, 'config' | 'logs' | 'storage' | 'fetch' | 'metrics'>
): Promise<EntityDownloaderComponent> {
const logger = components.logs.getLogger('EntityDownloader')
const maxRetries: number = (await components.config.getNumber('MAX_RETRIES')) || 10
const waitTimeBetweenRetries: number = (await components.config.getNumber('WAIT_TIME_BETWEEN_RETRIES')) || 1000

return {
async downloadEntity(entity: DeployableEntity, servers: string[]) {
const markAsDeployed = entity.markAsDeployed || (async () => {})

logger.info('Downloading entity', {
entityId: entity.entityId,
entityType: entity.entityType,
servers: servers.join(',')
})

try {
await downloadEntityAndContentFiles(
{ ...components, fetcher: components.fetch },
entity.entityId,
servers,
new Map(),
'content',
maxRetries,
waitTimeBetweenRetries
)

components.metrics.increment('entity_download_success', { entityType: entity.entityType })
} catch (error: any) {
logger.error('Failed to download entity', {
entityId: entity.entityId,
entityType: entity.entityType,
errorMessage: error.message
})

components.metrics.increment('entity_download_failure', { entityType: entity.entityType })

const isNonRetryable = error.message?.match(/status: 4\d{2}/)

if (isNonRetryable) {
await markAsDeployed()
}

throw new EntityDownloadError(error.message, {
entity,
error
})
}

logger.info('Entity stored', { entityId: entity.entityId, entityType: entity.entityType })
}
}
}
2 changes: 2 additions & 0 deletions src/adapters/sns/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { SnsType, SnsOptions } from './types'
export * from './publisher'
Loading

0 comments on commit ea08ce4

Please sign in to comment.