Skip to content

Commit

Permalink
other approach
Browse files Browse the repository at this point in the history
  • Loading branch information
leanmendoza committed Nov 19, 2024
1 parent a8b3078 commit cf7e301
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 82 deletions.
2 changes: 2 additions & 0 deletions content/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ export async function initComponentsWithEnv(env: Environment): Promise<AppCompon
const deployedEntitiesBloomFilter = createDeployedEntitiesBloomFilter({ database, logs, clock })
const activeEntities = createActiveEntitiesComponent({ database, env, logs, metrics, denylist, sequentialExecutor })

await activeEntities.initialize(database)

const deployer = createDeployer({
metrics,
storage,
Expand Down
17 changes: 17 additions & 0 deletions content/src/controller/handlers/active-entities-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,20 @@ export async function getActiveEntitiesHandler(
body: entities
}
}

// Method: GET
export async function getActiveEntitiesScenesHandler(
context: HandlerContextWithPath<'activeEntities' | 'denylist', '/entities/active/scenes'>
): Promise<{ status: 200; body: Pick<Entity, 'id' | 'pointers' | 'timestamp'>[] }> {
const { activeEntities, denylist } = context.components
const entities: Entity[] = activeEntities.getAllCachedScenes().filter((result) => !denylist.isDenylisted(result.id))
const mapping = entities.map((entity) => ({
id: entity.id,
pointers: entity.pointers,
timestamp: entity.timestamp
}))
return {
status: 200,
body: mapping
}
}
41 changes: 0 additions & 41 deletions content/src/controller/handlers/active-entities-ids-handler.ts

This file was deleted.

23 changes: 11 additions & 12 deletions content/src/controller/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@ import { Router } from '@well-known-components/http-server'
import { multipartParserWrapper } from '@well-known-components/multipart-wrapper'
import { EnvironmentConfig } from '../Environment'
import { GlobalContext } from '../types'
import { getActiveEntitiesHandler } from './handlers/active-entities-handler'
import { getActiveEntitiesHandler, getActiveEntitiesScenesHandler } from './handlers/active-entities-handler'
import { createEntity } from './handlers/create-entity-handler'
import { createErrorHandler, preventExecutionIfBoostrapping } from './middlewares'
import { getFailedDeploymentsHandler } from './handlers/failed-deployments-handler'
import { getEntitiesByPointerPrefixHandler } from './handlers/filter-by-urn-handler'
import { getActiveEntityIdsByDeploymentHashHandler } from './handlers/get-active-entities-by-deployment-hash-handler'
import { getEntityAuditInformationHandler } from './handlers/get-audit-handler'
import { getAvailableContentHandler } from './handlers/get-available-content-handler'
import { getPointerChangesHandler } from './handlers/pointer-changes-handler'
import { getStatusHandler } from './handlers/status-handler'
import { getSnapshotsHandler } from './handlers/get-snapshots-handler'
import { getEntitiesHandler } from './handlers/get-entities-handler'
import { getChallengeHandler } from './handlers/get-challenge-handler'
import { getContentHandler } from './handlers/get-content-handler'
import { getEntityThumbnailHandler } from './handlers/get-entity-thumbnail-handler'
import { getDeploymentsHandler } from './handlers/get-deployments-handler'
import { getEntitiesHandler } from './handlers/get-entities-handler'
import { getEntityImageHandler } from './handlers/get-entity-image-handler'
import { getEntityThumbnailHandler } from './handlers/get-entity-thumbnail-handler'
import { getERC721EntityHandler } from './handlers/get-erc721-entity-handler'
import { getDeploymentsHandler } from './handlers/get-deployments-handler'
import { getChallengeHandler } from './handlers/get-challenge-handler'
import { getActiveEntityIdsByDeploymentHashHandler } from './handlers/get-active-entities-by-deployment-hash-handler'
import { getActiveEntitiesIdsHandler } from './handlers/active-entities-ids-handler'
import { getSnapshotsHandler } from './handlers/get-snapshots-handler'
import { getPointerChangesHandler } from './handlers/pointer-changes-handler'
import { getStatusHandler } from './handlers/status-handler'
import { createErrorHandler, preventExecutionIfBoostrapping } from './middlewares'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter({ components }: GlobalContext): Promise<Router<GlobalContext>> {
Expand All @@ -43,7 +42,7 @@ export async function setupRouter({ components }: GlobalContext): Promise<Router
router.get('/entities/:type', getEntitiesHandler) // TODO: Deprecate
router.get('/entities/active/collections/:collectionUrn', getEntitiesByPointerPrefixHandler)
router.post('/entities/active', getActiveEntitiesHandler)
router.post('/entities/active/ids', getActiveEntitiesIdsHandler)
router.get('/entities/active/scenes', getActiveEntitiesScenesHandler)
router.head('/contents/:hashId', getContentHandler)
router.get('/contents/:hashId', getContentHandler)
router.get('/available-content', getAvailableContentHandler)
Expand Down
44 changes: 29 additions & 15 deletions content/src/logic/deployments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,16 @@ export async function getDeployments(
export async function getDeploymentsForActiveEntities(
database: DatabaseClient,
entityIds?: string[],
pointers?: string[]
pointers?: string[],
entityType?: EntityType
): Promise<Deployment[]> {
// Generate the select according the info needed
const bothPresent = entityIds && entityIds.length > 0 && pointers && pointers.length > 0
const nonePresent = !entityIds && !pointers
if (bothPresent || nonePresent) {
throw Error('in getDeploymentsForActiveEntities ids or pointers must be present, but not both')
// Validate that only one parameter is provided
const providedParams = [entityIds && entityIds.length > 0, pointers && pointers.length > 0, entityType].filter(
Boolean
)

if (providedParams.length !== 1) {
throw Error('getDeploymentsForActiveEntities requires exactly one of: entityIds, pointers, or entityType')
}

const query: SQLStatement = SQL`
Expand All @@ -248,15 +251,23 @@ export async function getDeploymentsForActiveEntities(
FROM deployments AS dep1
WHERE dep1.deleter_deployment IS NULL
AND `.append(
entityIds
entityIds && entityIds.length > 0
? SQL`dep1.entity_id = ANY (${entityIds})`
: SQL`dep1.entity_pointers && ${pointers!.map((p) => p.toLowerCase())}`
: pointers && pointers.length > 0
? SQL`dep1.entity_pointers && ${pointers.map((p) => p.toLowerCase())}`
: SQL`dep1.entity_type = ${entityType}`
)

const historicalDeploymentsResponse = await database.queryWithValues(query, 'get_active_entities')
const BATCH_SIZE = 1000
const deploymentsResult: HistoricalDeployment[] = []
const cursor = await database.streamQuery<HistoricalDeploymentsRow>(
query,
{ batchSize: BATCH_SIZE },
'get_active_entities'
)

const deploymentsResult: HistoricalDeployment[] = historicalDeploymentsResponse.rows.map(
(row: HistoricalDeploymentsRow): HistoricalDeployment => ({
for await (const row of cursor) {
deploymentsResult.push({
deploymentId: row.id,
entityType: row.entity_type,
entityId: row.entity_id,
Expand All @@ -269,12 +280,15 @@ export async function getDeploymentsForActiveEntities(
localTimestamp: row.local_timestamp,
overwrittenBy: row.overwritten_by ?? undefined
})
)

const deploymentIds = deploymentsResult.map(({ deploymentId }) => deploymentId)
}

const content = await getContentFiles(database, deploymentIds)
// Batch fetch all content files at once
const content = await getContentFiles(
database,
deploymentsResult.map((d) => d.deploymentId)
)

// Map results to final format
return deploymentsResult.map((result) => ({
entityVersion: result.version as EntityVersion,
entityType: result.entityType as EntityType,
Expand Down
108 changes: 94 additions & 14 deletions content/src/ports/activeEntities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ export type ActiveEntities = IBaseComponent & {
* Note: only used in stale profiles GC
*/
clearPointers(pointers: string[]): Promise<void>

/**
* Initialize the cache with the active entities for the cached entity types
*/
initialize(database: DatabaseClient): Promise<void>

/**
* Get all cached scenes
*/
getAllCachedScenes(): Entity[]
}

/**
Expand All @@ -81,9 +91,6 @@ export function createActiveEntitiesComponent(
components: Pick<AppComponents, 'database' | 'env' | 'logs' | 'metrics' | 'denylist' | 'sequentialExecutor'>
): ActiveEntities {
const logger = components.logs.getLogger('ActiveEntities')
const cache = new LRU<string, Entity | NotActiveEntity>({
max: components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE)
})

const collectionUrnsByPrefixCache = new LRU<string, string[]>({
ttl: 1000 * 60 * 60 * 24, // 24 hours
Expand All @@ -94,6 +101,47 @@ export function createActiveEntitiesComponent(

const normalizePointerCacheKey = (pointer: string) => pointer.toLowerCase()

const cache = new LRU<string, Entity | NotActiveEntity>({
max: components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE)
})
const fixedCache = new Map<string, Entity | NotActiveEntity>()

const createLRUandFixedCache = (maxLRU: number, fixedTypes: EntityType[]) => {
return {
get(entityId: string) {
return cache.get(entityId) || fixedCache.get(entityId)
},
set(entityId: string, entity: Entity | NotActiveEntity) {
const isFixed = fixedCache.has(entityId) || (typeof entity !== 'string' && fixedTypes.includes(entity.type))
if (isFixed) {
return fixedCache.set(entityId, entity)
}
return cache.set(entityId, entity)
},
setFixed(entityId: string, entity: Entity | NotActiveEntity) {
return fixedCache.set(entityId, entity)
},
get max() {
return cache.max
},
clear() {
cache.clear()
fixedCache.clear()
},
has(entityId: string) {
return cache.has(entityId) || fixedCache.has(entityId)
}
}
}

const cachedEntityntityTypes = [EntityType.SCENE]

// Entities cache by key=entityId
const entityCache = createLRUandFixedCache(
components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE),
cachedEntityntityTypes
)

const createEntityByPointersCache = (): Map<string, string | NotActiveEntity> => {
const entityIdByPointers = new Map<string, string | NotActiveEntity>()
return {
Expand All @@ -116,7 +164,7 @@ export function createActiveEntitiesComponent(
const entityIdByPointers = createEntityByPointersCache()

// init gauge metrics
components.metrics.observe('dcl_entities_cache_storage_max_size', {}, cache.max)
components.metrics.observe('dcl_entities_cache_storage_max_size', {}, entityCache.max)
Object.values(EntityType).forEach((entityType) => {
components.metrics.observe('dcl_entities_cache_storage_size', { entity_type: entityType }, 0)
})
Expand All @@ -133,9 +181,9 @@ export function createActiveEntitiesComponent(
// pointer now have a different active entity, let's update the old one
const entityId = entityIdByPointers.get(pointer)
if (isPointingToEntity(entityId)) {
const entity = cache.get(entityId) // it should be present
const entity = entityCache.get(entityId) // it should be present
if (isEntityPresent(entity)) {
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
for (const pointer of entity.pointers) {
entityIdByPointers.set(pointer, 'NOT_ACTIVE_ENTITY')
}
Expand All @@ -158,7 +206,7 @@ export function createActiveEntitiesComponent(
entityIdByPointers.set(pointer, isEntityPresent(entity) ? entity.id : entity)
}
if (isEntityPresent(entity)) {
cache.set(entity.id, entity)
entityCache.set(entity.id, entity)
components.metrics.increment('dcl_entities_cache_storage_size', { entity_type: entity.type })
// Store in the db the new entity pointed by pointers
await updateActiveDeployments(database, pointers, entity.id)
Expand Down Expand Up @@ -196,7 +244,7 @@ export function createActiveEntitiesComponent(
)

for (const entityId of entityIdsWithoutActiveEntity) {
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
logger.debug('entityId has no active entity', { entityId })
}
}
Expand Down Expand Up @@ -227,6 +275,19 @@ export function createActiveEntitiesComponent(
return entities
}

async function populateEntityType(database: DatabaseClient, entityType: EntityType): Promise<void> {
const deployments = await getDeploymentsForActiveEntities(database, undefined, undefined, entityType)

logger.info('Populating cache for entity type', { entityType, deployments: deployments.length })

for (const deployment of deployments) {
reportCacheAccess(deployment.entityType, 'miss')
}

const entities = mapDeploymentsToEntities(deployments)
await updateCache(database, entities, {})
}

/**
* Retrieve active entities by their ids
*/
Expand All @@ -236,7 +297,7 @@ export function createActiveEntitiesComponent(
const onCache: (Entity | NotActiveEntity)[] = []
const remaining: string[] = []
for (const entityId of uniqueEntityIds) {
const entity = cache.get(entityId)
const entity = entityCache.get(entityId)
if (entity) {
onCache.push(entity)
if (isEntityPresent(entity)) {
Expand Down Expand Up @@ -278,6 +339,8 @@ export function createActiveEntitiesComponent(
}
}

logger.info('Retrieving entities by pointers', { pointers: remaining.length })

// once we get the ids, retrieve from cache or find
const entityIds = Array.from(uniqueEntityIds.values())
const entitiesById = await withIds(database, entityIds)
Expand Down Expand Up @@ -313,7 +376,7 @@ export function createActiveEntitiesComponent(
for (const pointer of pointers) {
if (entityIdByPointers.has(pointer)) {
const entityId = entityIdByPointers.get(pointer)!
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityIdByPointers.set(pointer, 'NOT_ACTIVE_ENTITY')
}
}
Expand All @@ -322,7 +385,23 @@ export function createActiveEntitiesComponent(
function reset() {
entityIdByPointers.clear()
collectionUrnsByPrefixCache.clear()
cache.clear()
entityCache.clear()
}

async function initialize(database: DatabaseClient) {
logger.info('Initializing active entities cache', {
entityTypes: cachedEntityntityTypes.map((t) => t.toString()).toString()
})
for (const entityType of cachedEntityntityTypes) {
await populateEntityType(database, entityType)
}
logger.info('Active entities cache initialized')
}

function getAllCachedScenes() {
return Array.from(fixedCache.values()).filter(
(entity): entity is Entity => typeof entity !== 'string' && entity.type === EntityType.SCENE
)
}

return {
Expand All @@ -333,10 +412,11 @@ export function createActiveEntitiesComponent(
update,
clear,
clearPointers,

initialize,
getAllCachedScenes,
getCachedEntity(idOrPointer) {
if (cache.has(idOrPointer)) {
const cachedEntity = cache.get(idOrPointer)
if (entityCache.has(idOrPointer)) {
const cachedEntity = entityCache.get(idOrPointer)
return isEntityPresent(cachedEntity) ? cachedEntity.id : cachedEntity
}
return entityIdByPointers.get(idOrPointer)
Expand Down

0 comments on commit cf7e301

Please sign in to comment.