Skip to content

Commit

Permalink
feat: Update logic to support multiple processors running. Also, adde…
Browse files Browse the repository at this point in the history
…d the upsert call for Chain entity.

Small update in the .env file and jsdoc for utils function.
  • Loading branch information
brunomenezes committed Aug 22, 2024
1 parent 2d4e27c commit 681f0fb
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 57 deletions.
13 changes: 4 additions & 9 deletions .env
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
DB_NAME=squid
DB_PORT=23798
GQL_PORT=4350
# Check the README file for more information about the available environment variables.
CHAIN_IDS="1, 11155111"
# JSON-RPC node endpoint, both wss and https endpoints are accepted
CHAIN_ID="11155111"
# possible configurations
# RPC_URL_1=
# RPC_URL_11155111=
# RPC_URL_31337=
# RPC_URL_10=
# RPC_URL_11155420=
# RPC_URL_8453=
# RPC_URL_84532=
# RPC_RATE_LIMIT_1=
# NORMALLY FOR USE WITH LOCAL/DEV
# GENESIS_BLOCK_31337=22
# BLOCK_CONFIRMATIONS_31337=1
# BLOCK_CONFIRMATIONS_31337=0
111 changes: 64 additions & 47 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,76 @@
import { createLogger } from '@subsquid/logger';
import { TypeormDatabase } from '@subsquid/typeorm-store';
import { Store, TypeormDatabase } from '@subsquid/typeorm-store';

import EventHandler from './handlers/EventHandler';
import { createProcessor } from './processor';
import { createProcessor, ProcessorContext } from './processor';
import { loadChainsToIndexFromEnvironment } from './utils';

const logger = createLogger('sqd:startup');

const defaultChainId = '31337';
if (!process.env.CHAIN_ID) {
const chainsToIndex = loadChainsToIndexFromEnvironment();
if (chainsToIndex.usingDefault)
logger.warn(
`Undefined environment variable CHAIN_ID, defaulting to ${defaultChainId}`,
`Could not find valid chains defined on CHAIN_IDS env var, defaulting to ${chainsToIndex.chains[0]}`,
);

let message = '';
if (chainsToIndex.chains.length > 1) {
message += `Starting processors for chains ${chainsToIndex.chains.join(
',',
)}`;
} else {
message = `Starting processor for chain ${chainsToIndex.chains[0]}`;
}
const chainId = parseInt(process.env.CHAIN_ID ?? defaultChainId);
logger.info(`Starting processor for chain ${chainId}...`);
logger.info(message);

// instantiate processor for chain
const processor = createProcessor(chainId);

processor.run(new TypeormDatabase({ supportHotBlocks: true }), async (ctx) => {
const eventHandler = new EventHandler();

for (const block of ctx.blocks) {
for (const log of block.logs) {
await eventHandler.handle(log, block, ctx);
}
}

const {
tokens,
applications,
factories,
deposits,
inputs,
nfts,
erc721Deposits,
multiTokens,
erc1155Deposits,
} = eventHandler.getValues();

const total = eventHandler.getTotalHandled();

if (total > 0) {
ctx.log.info(
`Flushing ${total} entities: ${eventHandler.getSummary()}`,
);
}

await ctx.store.upsert([...multiTokens.values()]);
await ctx.store.upsert([...tokens.values()]);
await ctx.store.upsert([...nfts.values()]);
await ctx.store.upsert([...factories.values()]);
await ctx.store.upsert([...applications.values()]);
await ctx.store.upsert([...deposits.values()]);
await ctx.store.upsert([...erc721Deposits.values()]);
await ctx.store.upsert([...erc1155Deposits.values()]);
await ctx.store.upsert([...inputs.values()]);
chainsToIndex.chains.forEach((chainId: number) => {
const processor = createProcessor(chainId);
processor.run(
new TypeormDatabase({
supportHotBlocks: true,
stateSchema: `processor-${chainId}`,
}),
async (ctx: ProcessorContext<Store>) => {
const eventHandler = new EventHandler();

for (const block of ctx.blocks) {
for (const log of block.logs) {
await eventHandler.handle(log, block, ctx);
}
}

const {
tokens,
applications,
factories,
deposits,
inputs,
nfts,
erc721Deposits,
multiTokens,
erc1155Deposits,
chains,
} = eventHandler.getValues();

const total = eventHandler.getTotalHandled();

if (total > 0) {
ctx.log.info(
`Flushing ${total} entities: ${eventHandler.getSummary()}`,
);
}

await ctx.store.upsert([...chains.values()]);
await ctx.store.upsert([...multiTokens.values()]);
await ctx.store.upsert([...tokens.values()]);
await ctx.store.upsert([...nfts.values()]);
await ctx.store.upsert([...factories.values()]);
await ctx.store.upsert([...applications.values()]);
await ctx.store.upsert([...deposits.values()]);
await ctx.store.upsert([...erc721Deposits.values()]);
await ctx.store.upsert([...erc1155Deposits.values()]);
await ctx.store.upsert([...inputs.values()]);
},
);
});
8 changes: 7 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ export const supportedChains = new Set<number>([

/**
* Read the environment variable CHAIN_IDS and filter by
* supported chains, if none is found 31337 is returned as default.
* supported chains, if none the chain 31337 is returned as default.
* @example
* // Return when multiple supported chains are defined.
* {chains: [1, 10, 11155111], usingDefault: false}
* @example
* // Return when no valid chains are defined.
* { chains: [31337], usingDefault: true}
* @returns {number[]}
*/
export function loadChainsToIndexFromEnvironment() {
Expand Down

0 comments on commit 681f0fb

Please sign in to comment.