From 20085feef61119b58fef540cba79595b4a85a89a Mon Sep 17 00:00:00 2001 From: Mior Muhammad Zaki Date: Sat, 11 Nov 2023 06:47:14 +0800 Subject: [PATCH] [10.x] Fixes Batch Callbacks not triggering if job timeout while in transaction (#48961) * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * Apply fixes from StyleCI * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * Apply fixes from StyleCI * wip Signed-off-by: Mior Muhammad Zaki * Apply fixes from StyleCI * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * wip Signed-off-by: Mior Muhammad Zaki * Update BatchableTransactionTest.php * Apply fixes from StyleCI * wip * wip Signed-off-by: Mior Muhammad Zaki * formatting * Apply fixes from StyleCI * formatting * formatting * Apply fixes from StyleCI * fix test --------- Signed-off-by: Mior Muhammad Zaki Co-authored-by: StyleCI Bot Co-authored-by: Taylor Otwell --- src/Illuminate/Bus/BatchRepository.php | 3 + .../Bus/DatabaseBatchRepository.php | 10 +++ src/Illuminate/Queue/Jobs/Job.php | 23 +++++++ .../Queue/BatchableTransactionTest.php | 65 +++++++++++++++++++ .../Fixtures/TimeOutJobWithTransaction.php | 22 +++++++ tests/Queue/QueueBeanstalkdJobTest.php | 2 +- 6 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 tests/Integration/Database/Queue/BatchableTransactionTest.php create mode 100644 tests/Integration/Database/Queue/Fixtures/TimeOutJobWithTransaction.php diff --git a/src/Illuminate/Bus/BatchRepository.php b/src/Illuminate/Bus/BatchRepository.php index 098ccef20ed6..0e580ca4dcd9 100644 --- a/src/Illuminate/Bus/BatchRepository.php +++ b/src/Illuminate/Bus/BatchRepository.php @@ -4,6 +4,9 @@ use Closure; +/** + * @method void rollBack() + */ interface BatchRepository { /** diff --git a/src/Illuminate/Bus/DatabaseBatchRepository.php b/src/Illuminate/Bus/DatabaseBatchRepository.php index c68e5c1a5fdd..4333c515ac79 100644 --- a/src/Illuminate/Bus/DatabaseBatchRepository.php +++ b/src/Illuminate/Bus/DatabaseBatchRepository.php @@ -312,6 +312,16 @@ public function transaction(Closure $callback) return $this->connection->transaction(fn () => $callback()); } + /** + * Rollback the last database transaction for the connection. + * + * @return void + */ + public function rollBack() + { + $this->connection->rollBack(); + } + /** * Serialize the given value. * diff --git a/src/Illuminate/Queue/Jobs/Job.php b/src/Illuminate/Queue/Jobs/Job.php index d5464946b7ee..67a80e50c03e 100755 --- a/src/Illuminate/Queue/Jobs/Job.php +++ b/src/Illuminate/Queue/Jobs/Job.php @@ -2,10 +2,14 @@ namespace Illuminate\Queue\Jobs; +use Illuminate\Bus\Batchable; +use Illuminate\Bus\BatchRepository; use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\ManuallyFailedException; +use Illuminate\Queue\TimeoutExceededException; use Illuminate\Support\InteractsWithTime; +use Throwable; abstract class Job { @@ -183,6 +187,25 @@ public function fail($e = null) return; } + $commandName = $this->payload()['data']['commandName'] ?? false; + + // If the exception is due to a job timing out, we need to rollback the current + // database transaction so that the failed job count can be incremented with + // the proper value. Otherwise, the current transaction will never commit. + if ($e instanceof TimeoutExceededException && + $commandName && + in_array(Batchable::class, class_uses_recursive($commandName))) { + $batchRepository = $this->resolve(BatchRepository::class); + + if (method_exists($batchRepository, 'rollBack')) { + try { + $batchRepository->rollBack(); + } catch (Throwable $e) { + // ... + } + } + } + try { // If the job has failed, we will delete it, call the "failed" method and then call // an event indicating the job has failed so it can be logged if needed. This is diff --git a/tests/Integration/Database/Queue/BatchableTransactionTest.php b/tests/Integration/Database/Queue/BatchableTransactionTest.php new file mode 100644 index 000000000000..46c1f66366fa --- /dev/null +++ b/tests/Integration/Database/Queue/BatchableTransactionTest.php @@ -0,0 +1,65 @@ +get('database.default') === 'testing') { + $this->markTestSkipped('Test does not support using :memory: database connection'); + } + + $config->set(['queue.default' => 'database']); + } + + public function testItCanHandleTimeoutJob() + { + Bus::batch([new Fixtures\TimeOutJobWithTransaction()]) + ->allowFailures() + ->dispatch(); + + $this->assertSame(1, DB::table('jobs')->count()); + $this->assertSame(0, DB::table('failed_jobs')->count()); + $this->assertSame(1, DB::table('job_batches')->count()); + + try { + remote('queue:work --stop-when-empty', [ + 'DB_CONNECTION' => config('database.default'), + 'QUEUE_CONNECTION' => config('queue.default'), + ])->run(); + } catch (Throwable $e) { + $this->assertInstanceOf(ProcessSignaledException::class, $e); + $this->assertSame('The process has been signaled with signal "9".', $e->getMessage()); + } + + $this->assertSame(0, DB::table('jobs')->count()); + $this->assertSame(1, DB::table('failed_jobs')->count()); + + $this->assertDatabaseHas('job_batches', [ + 'total_jobs' => 1, + 'pending_jobs' => 1, + 'failed_jobs' => 1, + 'failed_job_ids' => json_encode(DB::table('failed_jobs')->pluck('uuid')->all()), + ]); + } +} diff --git a/tests/Integration/Database/Queue/Fixtures/TimeOutJobWithTransaction.php b/tests/Integration/Database/Queue/Fixtures/TimeOutJobWithTransaction.php new file mode 100644 index 000000000000..cff613dcc63d --- /dev/null +++ b/tests/Integration/Database/Queue/Fixtures/TimeOutJobWithTransaction.php @@ -0,0 +1,22 @@ + sleep(20)); + } +} diff --git a/tests/Queue/QueueBeanstalkdJobTest.php b/tests/Queue/QueueBeanstalkdJobTest.php index ebea82a7de38..ec9bd34277c4 100755 --- a/tests/Queue/QueueBeanstalkdJobTest.php +++ b/tests/Queue/QueueBeanstalkdJobTest.php @@ -33,7 +33,7 @@ public function testFireProperlyCallsTheJobHandler() public function testFailProperlyCallsTheJobHandler() { $job = $this->getJob(); - $job->getPheanstalkJob()->shouldReceive('getData')->once()->andReturn(json_encode(['job' => 'foo', 'uuid' => 'test-uuid', 'data' => ['data']])); + $job->getPheanstalkJob()->shouldReceive('getData')->andReturn(json_encode(['job' => 'foo', 'uuid' => 'test-uuid', 'data' => ['data']])); $job->getContainer()->shouldReceive('make')->once()->with('foo')->andReturn($handler = m::mock(BeanstalkdJobTestFailedTest::class)); $job->getPheanstalk()->shouldReceive('delete')->once()->with($job->getPheanstalkJob())->andReturnSelf(); $handler->shouldReceive('failed')->once()->with(['data'], m::type(Exception::class), 'test-uuid');