Skip to content

Commit

Permalink
feat: Allow cancelling jobs from client (#91)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Strat <[email protected]>
  • Loading branch information
evan10s and evan10s authored Feb 8, 2024
1 parent 46af4ff commit 33f6151
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 55 deletions.
45 changes: 44 additions & 1 deletion client/src/components/jobs/JobListItem.vue
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,61 @@
:href="`https://www.youtube.com/watch?v=${job.youTubeVideoId}`"
target="_blank"
/>
<VBtn v-else-if="[WorkerJobStatus.PENDING, WorkerJobStatus.FAILED_RETRYABLE].includes(job.status)
&& !showConfirmCancel"
variant="text"
color="gray"
icon="mdi-trash-can-outline"
@click="showConfirmCancel = true"
/>
<VBtn v-else-if="[WorkerJobStatus.PENDING, WorkerJobStatus.FAILED_RETRYABLE].includes(job.status)
&& showConfirmCancel"
variant="text"
prepend-icon="mdi-trash-can-outline"
color="error"
:loading="cancelJobLoading"
text="Cancel?"
@click="cancelJob"
/>
</template>
</VListItem>
</template>
<script lang="ts" setup>
import { WorkerJob, WorkerJobStatus, workerJobStatusToUiString } from "@/types/WorkerJob";
import { computed } from "vue";
import { computed, ref, watch } from "vue";
import { capitalizeFirstLetter } from "@/util/capitalize";
import { useWorkerStore } from "@/stores/worker";
const props = defineProps<{
job: WorkerJob
}>();
const workerStore = useWorkerStore();
const showConfirmCancel = ref(false);
const showConfirmTimeoutId = ref<number | null>(null);
watch(showConfirmCancel, (newVal) => {
if (!newVal) {
return;
}
showConfirmTimeoutId.value = setTimeout(() => {
showConfirmCancel.value = false;
}, 5000) as unknown as number; // Browsers return a number from setTimeout but TypeScript is interpreting this
// as a Node function, see https://stackoverflow.com/a/22747243
});
const cancelJobLoading = ref(false);
async function cancelJob() {
if (showConfirmTimeoutId.value) {
clearTimeout(showConfirmTimeoutId.value);
}
cancelJobLoading.value = true;
await workerStore.cancelJob(props.job.jobId);
cancelJobLoading.value = false;
showConfirmCancel.value = false;
}
const subtitle = computed(() => {
let baseSubtitle = capitalizeFirstLetter(workerJobStatusToUiString(props.job.status));
if (props.job.error) {
Expand Down
9 changes: 9 additions & 0 deletions client/src/components/jobs/JobsList.vue
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
<template>
<VAlert v-if="workerStore.jobCancellationError"
class="mt-2"
color="error"
>
{{ workerStore.jobCancellationError }}
</VAlert>
<VList>
<div v-for="job in jobsList" :key="job.jobId">
<JobListItem :job="job" />
Expand All @@ -8,6 +14,9 @@
<script lang="ts" setup>
import {WorkerJob} from "@/types/WorkerJob";
import JobListItem from "@/components/jobs/JobListItem.vue";
import { useWorkerStore } from "@/stores/worker";
const workerStore = useWorkerStore();
const props = defineProps<{
jobsList: WorkerJob[]
Expand Down
2 changes: 1 addition & 1 deletion client/src/stores/match.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export const useMatchStore = defineStore("match", () => {
const allMatchVideosUploaded = computed(() => {
return matchVideos.value.length > 0 &&
matchVideos.value.every(
video => workerStore.jobHasStatus(video.workerJobId,[WorkerJobStatus.COMPLETED, WorkerJobStatus.FAILED]),
video => workerStore.jobHasStatus(video.workerJobId,[WorkerJobStatus.COMPLETED]),
);
});

Expand Down
22 changes: 22 additions & 0 deletions client/src/stores/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,32 @@ export const useWorkerStore = defineStore("worker", () => {
});
}

const jobCancellationError = ref("");
async function cancelJob(jobId: string) {
jobCancellationError.value = "";

const result = await fetch("/api/v1/worker/jobs/cancel", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
jobId,
reason: "Cancelled by user",
}),
});

if (!result.ok) {
jobCancellationError.value = `API error (${result.status} ${result.statusText}): Error deleting job #${jobId}`;
}
}

return {
bindEvents,
cancelJob,
events,
isConnected,
jobCancellationError,
jobCountsByStatus,
jobHasStatus,
jobs,
Expand Down
69 changes: 21 additions & 48 deletions server/src/routes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import Paths from "@src/routes/constants/Paths";
import { type IReq, type IRes } from "@src/routes/types/types";
import { graphileWorkerUtils, prisma } from "@src/server";
import { body, matchedData, param, validationResult } from "express-validator";
import logger from "jet-logger";
import { type JobStatus } from "@prisma/client";
import { JobStatus } from "@prisma/client";
import { cancelJob } from "@src/services/WorkerService";

export const workerRouter = Router();
export const workerJobsRouter = Router();
Expand Down Expand Up @@ -107,9 +107,9 @@ async function forceUnlockWorkers(req: IReq, res: IRes): Promise<IRes> {

workerJobsRouter.post(
Paths.Worker.Jobs.Cancel,
body("jobIds").isArray(),
body("jobId").isString(),
body("reason").isString(),
permanentlyFailJobs,
permanentlyFailJob,
);

/**
Expand All @@ -119,7 +119,7 @@ workerJobsRouter.post(
* @param req
* @param res
*/
async function permanentlyFailJobs(req: IReq, res: IRes): Promise<IRes> {
async function permanentlyFailJob(req: IReq, res: IRes): Promise<IRes> {
const errors = validationResult(req);

if (!errors.isEmpty()) {
Expand All @@ -129,55 +129,28 @@ async function permanentlyFailJobs(req: IReq, res: IRes): Promise<IRes> {
});
}

const { jobIds, reason } = matchedData(req);
const { jobId, reason } = matchedData(req);

const [graphileResult, prismaResult] = await Promise.all([
graphileWorkerUtils.permanentlyFailJobs(jobIds as string[], reason as string),
prisma.workerJob.updateMany({
where: {
jobId: {
in: jobIds as string[],
},
},
data: {
status: "FAILED",
error: reason as string,
},
}),
]);

if (graphileResult.length !== prismaResult.count) {
return res.json({
success: false,
message: "Graphile and Prisma job fail results have different lengths, so not all jobs requested were " +
"actually cancelled.",
prismaUpdateCount: prismaResult.count,
graphileResult,
});
}
const workerJob = await prisma.workerJob.findUnique({
where: {
jobId: jobId as string,
},
});

if (graphileResult.length === 0) {
return res.json({
if (!workerJob) {
return res.status(404).json({
success: false,
message: "No jobs cancelled because none of the job IDs were found in the database",
error: `Job #${jobId} not found`,
});
}

if (graphileResult.length !== (jobIds as string[]).length) {
logger.warn(
`Graphile response indicates some jobs were not actually cancelled: ${JSON.stringify(graphileResult)}`,
);
return res.json({
success: false,
message: "Not all jobs requested were actually cancelled",
cancelled: (jobIds as string[]).filter(
(id: string) => graphileResult
.find(result => result.id === id) !== undefined),
notCancelled: (jobIds as string[]).filter(
(id: string) => graphileResult
.find(result => result.id === id) === undefined),
});
if (workerJob.status === JobStatus.PENDING || workerJob.status === JobStatus.FAILED_RETRYABLE) {
await cancelJob(jobId as string, reason as string);
return res.json({ success: true });
}

return res.json({ success: true, updated: prismaResult.count });
return res.status(400).json({
success: false,
error: `Job cannot be cancelled from ${workerJob.status} status`,
});
}
30 changes: 29 additions & 1 deletion server/src/services/WorkerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,41 @@ export async function queueJob(jobSummary: string,
});

io.emit("worker", {
event: "worker:job:created",
event: "worker:job:created", // TODO: Use constant, change created->create
workerJob: upsertResult,
});

return upsertResult;
}

export async function cancelJob(jobId: string, reason: string): Promise<void> {
const [graphileResult, workerJob] = await Promise.all([
graphileWorkerUtils.permanentlyFailJobs([jobId], reason),
prisma.workerJob.update({
where: {
jobId,
},
data: {
status: "FAILED",
error: reason,
},
}),
]);

if (graphileResult.length !== 1) {
logger.err(`Error cancelling job with ID ${jobId}: Graphile permanent fail request did not return a job ID` +
" (see graphileResult below)");
logger.err(graphileResult);
throw new Error("Graphile permanent fail request was unsuccessful (check the logs for more details)");
}

// Manually send the WebSocket event since the job was cancelled because Graphile won't send an event in this case
io.emit("worker", {
event: WORKER_JOB_COMPLETE,
workerJob,
});
}

async function handleWorkerJobStart(data: ClientToServerEvents[typeof WORKER_JOB_START]): Promise<void> {
// Because we're receiving websocket data from a client, we can't be 100% sure the data truly matches the typing
if (!isWorkerJobStartEvent(data)) {
Expand Down
5 changes: 1 addition & 4 deletions server/src/tasks/types/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ export interface CommonEvents {
payload: unknown;
};
[WORKER_JOB_COMPLETE]: {
workerId: string,
workerId: string | null,
jobId: string,
jobName: string,
attempts: number,
maxAttempts: number,
payload: unknown,
Expand Down Expand Up @@ -44,9 +43,7 @@ export function isWorkerJobStartEvent(x: unknown): x is CommonEvents[typeof WORK
export function isWorkerJobCompleteEvent(x: unknown): x is CommonEvents[typeof WORKER_JOB_COMPLETE] {
return typeof x === "object" &&
x !== null &&
"workerId" in x &&
"jobId" in x &&
"jobName" in x &&
"attempts" in x &&
"maxAttempts" in x &&
"payload" in x &&
Expand Down

0 comments on commit 33f6151

Please sign in to comment.