diff --git a/Classes/Queue/RedisQueue.php b/Classes/Queue/RedisQueue.php index 34fedd7..bae833f 100644 --- a/Classes/Queue/RedisQueue.php +++ b/Classes/Queue/RedisQueue.php @@ -94,7 +94,7 @@ public function submit($payload, array $options = []) $messageId = Algorithms::generateUUID(); $idStored = $this->client->hSet("queue:{$this->name}:ids", $messageId, json_encode($payload)); if ($idStored === 0) { - return null; + throw new JobQueueException(sprintf('Duplicate message id: "%s"', $messageId), 1470656350); } $this->client->lPush("queue:{$this->name}:messages", $messageId); @@ -141,10 +141,11 @@ public function waitAndReserve($timeout = null) public function release($messageId, array $options = []) { $this->checkClientConnection(); - $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); - $numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId); - $this->client->hSet("queue:{$this->name}:releases", $messageId, $numberOfReleases + 1); - $this->client->lPush("queue:{$this->name}:messages", $messageId); + $this->client->multi() + ->lRem("queue:{$this->name}:processing", $messageId, 0) + ->hIncrBy("queue:{$this->name}:releases", $messageId, 1) + ->lPush("queue:{$this->name}:messages", $messageId) + ->exec(); } /** @@ -165,9 +166,10 @@ public function abort($messageId) public function finish($messageId) { $this->checkClientConnection(); + $numberOfRemoved = $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); $this->client->hDel("queue:{$this->name}:ids", $messageId); $this->client->hDel("queue:{$this->name}:releases", $messageId); - return $this->client->lRem("queue:{$this->name}:processing", $messageId, 0) > 0; + return $numberOfRemoved > 0; } /**