Skip to content

Commit

Permalink
feat: add worker timeout
Browse files Browse the repository at this point in the history
setting timeout as 1 minute
TODO: handle memory leaks from stalled jobs
  • Loading branch information
wei committed Nov 21, 2024
1 parent 511832e commit 9b6357e
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions src/processor/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
import type { Job } from "bullmq";
import type { SchedulerJobData } from "@wei/probot-scheduler";
import type { Probot } from "probot";
import type { Logger, Probot, ProbotOctokit } from "probot";
import logger from "@/src/utils/logger.ts";
import { getPullConfig } from "@/src/utils/get-pull-config.ts";
import { Pull } from "@/src/processor/pull.ts";

const TIMEOUT = 60 * 1000;

function createTimeoutPromise(log: Logger) {
return new Promise((_, reject) => {
setTimeout(() => {
log.warn("⏰ Job timed out after 1 minute");
reject(new Error("Job timed out after 1 minute"));
}, TIMEOUT);
});
}

async function processRepo(
octokit: ProbotOctokit,
jobData: SchedulerJobData,
log: Logger,
) {
const { owner, repo } = jobData;

const config = await getPullConfig(octokit, log, jobData);
if (!config) {
log.info(`⚠️ No config found, skipping`);
return;
}

const pull = new Pull(octokit, { owner, repo, logger: log }, config);
await pull.routineCheck();
}

export function getRepoProcessor(probot: Probot) {
return async function RepoJobProcessor(job: Job<SchedulerJobData>) {
const log = logger.child({
Expand All @@ -14,23 +42,18 @@ export function getRepoProcessor(probot: Probot) {

log.info("🏃 Processing repo job");

const { installation_id, owner, repo } = job.data;

try {
const octokit = await probot.auth(installation_id);

const config = await getPullConfig(octokit, log, job.data);
if (!config) {
log.info(`⚠️ No config found, skipping`);
return;
}
const octokit = await probot.auth(job.data.installation_id);

const pull = new Pull(octokit, { owner, repo, logger: log }, config);
await pull.routineCheck();
await Promise.race([
processRepo(octokit, job.data, log),
createTimeoutPromise(log),
]);

log.info(`✅ Repo job ${job.id} processed successfully`);
log.info(`✅ Repo job processed successfully`);
} catch (error) {
log.error(error, "❌ Repo job failed");
const message = error instanceof Error ? error.message : "Unknown error";
log.error(error, `❌ Repo job failed: ${message}`);
}
};
}

0 comments on commit 9b6357e

Please sign in to comment.