diff --git a/Classes/Queue/RedisQueue.php b/Classes/Queue/RedisQueue.php index e8846d3..769fa68 100644 --- a/Classes/Queue/RedisQueue.php +++ b/Classes/Queue/RedisQueue.php @@ -13,6 +13,8 @@ use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; +use TYPO3\Flow\Utility\Algorithms; +use Flowpack\JobQueue\Common\Exception as JobQueueException; /** * A queue implementation using Redis as the queue backend @@ -58,51 +60,49 @@ class RedisQueue implements QueueInterface protected $maxReconnectDelay = 30.0; /** - * Constructor - * * @param string $name * @param array $options + * @throws JobQueueException */ - public function __construct($name, array $options = array()) + public function __construct($name, array $options = []) { $this->name = $name; if (isset($options['defaultTimeout'])) { $this->defaultTimeout = (integer)$options['defaultTimeout']; } - $this->clientOptions = isset($options['client']) ? $options['client'] : array(); - + $this->clientOptions = isset($options['client']) ? $options['client'] : []; $this->client = new \Redis(); if (!$this->connectClient()) { - throw new \Flowpack\JobQueue\Common\Exception('Could not connect to Redis', 1467382685); + throw new JobQueueException('Could not connect to Redis', 1467382685); } } /** - * Submit a message to the queue - * - * @param Message $message - * @return void + * @inheritdoc + */ + public function getName() + { + return $this->name; + } + + /** + * @inheritdoc */ - public function submit(Message $message) + public function submit($payload, array $options = []) { $this->checkClientConnection(); - if ($message->getIdentifier() !== null) { - $added = $this->client->sAdd("queue:{$this->name}:ids", $message->getIdentifier()); - if (!$added) { - return; - } + $messageId = Algorithms::generateUUID(); + $idStored = $this->client->hSet("queue:{$this->name}:ids", $messageId, json_encode($payload)); + if ($idStored === 0) { + return null; } - $encodedMessage = $this->encodeMessage($message); - $this->client->lPush("queue:{$this->name}:messages", $encodedMessage); - $message->setState(Message::STATE_SUBMITTED); + + $this->client->lPush("queue:{$this->name}:messages", $messageId); + return $messageId; } /** - * Wait for a message in the queue and return the message for processing - * (without safety queue) - * - * @param int $timeout - * @return Message The received message or NULL if a timeout occurred + * @inheritdoc */ public function waitAndTake($timeout = null) { @@ -111,33 +111,19 @@ public function waitAndTake($timeout = null) } $this->checkClientConnection(); $keyAndValue = $this->client->brPop("queue:{$this->name}:messages", $timeout); - $value = isset($keyAndValue[1]) ? $keyAndValue[1] : null; - if (is_string($value)) { - $message = $this->decodeMessage($value); - - if ($message->getIdentifier() !== null) { - $this->client->sRem("queue:{$this->name}:ids", $message->getIdentifier()); - } - - // The message is marked as done - $message->setState(Message::STATE_DONE); - - return $message; - } else { + $messageId = isset($keyAndValue[1]) ? $keyAndValue[1] : null; + if ($messageId === null) { return null; } + $message = $this->getMessageById($messageId); + if ($message !== null) { + $this->client->hDel("queue:{$this->name}:ids", $messageId); + } + return $message; } /** - * Wait for a message in the queue and save the message to a safety queue - * - * TODO: Idea for implementing a TTR (time to run) with monitoring of safety queue. E.g. - * use different queue names with encoded times? With "brpoplpush" we cannot modify the - * queued item on transfer to the safety queue and we cannot update a timestamp to mark - * the run start time in the message, so separate keys should be used for this. - * - * @param int $timeout - * @return Message + * @inheritdoc */ public function waitAndReserve($timeout = null) { @@ -145,115 +131,108 @@ public function waitAndReserve($timeout = null) $timeout = $this->defaultTimeout; } $this->checkClientConnection(); - $value = $this->client->brpoplpush("queue:{$this->name}:messages", "queue:{$this->name}:processing", $timeout); - if (is_string($value)) { - $message = $this->decodeMessage($value); - if ($message->getIdentifier() !== null) { - $this->client->sRem("queue:{$this->name}:ids", $message->getIdentifier()); - } - return $message; - } else { - return null; - } + $messageId = $this->client->brpoplpush("queue:{$this->name}:messages", "queue:{$this->name}:processing", $timeout); + return $this->getMessageById($messageId); } /** - * Mark a message as finished - * - * @param Message $message - * @return boolean TRUE if the message could be removed + * @inheritdoc + */ + public function release($messageId, array $options = []) + { + $this->checkClientConnection(); + $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); + $numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId); + $this->client->hSet("queue:{$this->name}:releases", $messageId, $numberOfReleases + 1); + $this->client->lPush("queue:{$this->name}:messages", $messageId); + } + + /** + * @inheritdoc */ - public function finish(Message $message) + public function abort($messageId) { $this->checkClientConnection(); - $originalValue = $message->getOriginalValue(); - $success = $this->client->lRem("queue:{$this->name}:processing", $originalValue, 0) > 0; - if ($success) { - $message->setState(Message::STATE_DONE); + $numberOfRemoved = $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); + if ($numberOfRemoved === 1) { + $this->client->lPush("queue:{$this->name}:failed", $messageId); } - return $success; } /** - * Peek for messages - * - * @param integer $limit - * @return Message[] Messages or empty array if no messages were present + * @inheritdoc + */ + public function finish($messageId) + { + $this->checkClientConnection(); + $this->client->hDel("queue:{$this->name}:ids", $messageId); + $this->client->hDel("queue:{$this->name}:releases", $messageId); + return $this->client->lRem("queue:{$this->name}:processing", $messageId, 0) > 0; + } + + /** + * @inheritdoc */ public function peek($limit = 1) { $this->checkClientConnection(); $result = $this->client->lRange("queue:{$this->name}:messages", -($limit), -1); - if (is_array($result) && count($result) > 0) { - $messages = array(); - foreach ($result as $value) { - $message = $this->decodeMessage($value); - // The message is still submitted and should not be processed! - $message->setState(Message::STATE_SUBMITTED); - $messages[] = $message; - } - return $messages; + if (!is_array($result) || count($result) === 0) { + return []; + } + $messages = []; + foreach ($result as $messageId) { + $encodedPayload = $this->client->hGet("queue:{$this->name}:ids", $messageId); + $messages[] = new Message($messageId, json_decode($encodedPayload, true)); } - return array(); + return $messages; } /** - * Count messages in the queue - * - * @return integer + * @inheritdoc */ public function count() { $this->checkClientConnection(); - $count = $this->client->lLen("queue:{$this->name}:messages"); - return $count; + return $this->client->lLen("queue:{$this->name}:messages"); } /** - * Encode a message - * - * Updates the original value property of the message to resemble the - * encoded representation. - * - * @param Message $message - * @return string + * @return void */ - protected function encodeMessage(Message $message) + public function setUp() { - $value = json_encode($message->toArray()); - $message->setOriginalValue($value); - return $value; + $this->checkClientConnection(); } /** - * Decode a message from a string representation - * - * @param string $value - * @return Message + * @inheritdoc */ - protected function decodeMessage($value) + public function flush() { - $decodedMessage = json_decode($value, true); - $message = new Message($decodedMessage['payload']); - if (isset($decodedMessage['identifier'])) { - $message->setIdentifier($decodedMessage['identifier']); - } - $message->setOriginalValue($value); - return $message; + $this->checkClientConnection(); + $this->client->flushDB(); } /** - * - * @param string $identifier + * @param string $messageId * @return Message */ - public function getMessage($identifier) + protected function getMessageById($messageId) { - return null; + if (!is_string($messageId)) { + return null; + } + $encodedPayload = $this->client->hGet("queue:{$this->name}:ids", $messageId); + $numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId); + return new Message($messageId, json_decode($encodedPayload, true), $numberOfReleases); } /** * Check if the Redis client connection is still up and reconnect if Redis was disconnected + * + * @return void + * @throws JobQueueException */ protected function checkClientConnection() { @@ -268,7 +247,7 @@ protected function checkClientConnection() } if ($reconnect) { if (!$this->connectClient()) { - throw new \Flowpack\JobQueue\Common\Exception('Could not connect to Redis', 1467382685); + throw new JobQueueException('Could not connect to Redis', 1467382685); } } } @@ -286,11 +265,9 @@ protected function connectClient() $host = isset($this->clientOptions['host']) ? $this->clientOptions['host'] : '127.0.0.1'; $port = isset($this->clientOptions['port']) ? $this->clientOptions['port'] : 6379; $database = isset($this->clientOptions['database']) ? $this->clientOptions['database'] : 0; - // The connection read timeout should be higher than the timeout for blocking operations! $timeout = isset($this->clientOptions['timeout']) ? $this->clientOptions['timeout'] : round($this->defaultTimeout * 1.5); $connected = $this->client->connect($host, $port, $timeout) && $this->client->select($database); - // Break the cycle that could cause a high CPU load if (!$connected) { usleep($this->reconnectDelay * 1e6); @@ -298,8 +275,6 @@ protected function connectClient() } else { $this->reconnectDelay = 1.0; } - return $connected; } - } \ No newline at end of file diff --git a/CodeOfConduct.rst b/CodeOfConduct.rst new file mode 100644 index 0000000..681f084 --- /dev/null +++ b/CodeOfConduct.rst @@ -0,0 +1,23 @@ +Contributor Code of Conduct +--------------------------- + +As contributors and maintainers of this project, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities. + +We are committed to making participation in this project a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality. + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing other's private information, such as physical or electronic addresses, without explicit permission +* Other unethical or unprofessional conduct. + +Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team. + +This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers. + +This Code of Conduct is adapted from the `Contributor Covenant `_, version 1.2.0, available at (http://contributor-covenant.org/version/1/2/0/ diff --git a/LICENSE b/LICENSE index bdeff91..764d528 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015 Neos project contributors +Copyright (c) 2016 Neos project contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md new file mode 100644 index 0000000..cbb946a --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# Flowpack.JobQueue.Redis + +A job queue backend for the [Flowpack.JobQueue.Common](https://github.com/Flowpack/jobqueue-common) package based on [redis](http://redis.io/). + +## Usage + +Install the package using composer: + +``` +composer require flowpack/jobqueue-redis +``` + +If not already installed, that will fetch its requirements, namely `jobqueue-common`. +*NOTE:* This package needs a [redis](http://redis.io/) server and the [PHP redis extension](https://github.com/phpredis/phpredis) to be installed + +Now the queue can be configured like this: + +```yaml +Flowpack: + JobQueue: + Common: + queues: + 'some-queue': + className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue' + executeIsolated: true + options: + client: + host: 127.0.0.1 + port: 6379 + database: 15 + defaultTimeout: 20 +``` + +## Specific options + + +The `RedisQueue` supports following options: + +| Option | Type | Default | Description | +| ----------------------- |---------| ---------------------------------------------------------------------------------------------:| ---------------------------------------- | +| defaultTimeout | integer | 60 | Number of seconds new messages are waited for before a timeout occurs (This is overridden by a "timeout" argument in the `waitAndTake()` and `waitAndReserve()` methods | +| client | array | ['host' => '127.0.0.1', 'port' => 6379, 'database' => 0, 'timeout' => ] | Redis connection settings | + +### Submit options + +The `RedisQueue` currently doesn't support any custom submit options + +### Release options + +The `RedisQueue` currently doesn't support any custom release options + +## License + +This package is licensed under the MIT license + +## Contributions + +Pull-Requests are more than welcome. Make sure to read the [Code Of Conduct](CodeOfConduct.rst). \ No newline at end of file diff --git a/Tests/Functional/Queue/RedisQueueTest.php b/Tests/Functional/Queue/RedisQueueTest.php index 1822204..fbef716 100644 --- a/Tests/Functional/Queue/RedisQueueTest.php +++ b/Tests/Functional/Queue/RedisQueueTest.php @@ -11,136 +11,19 @@ * source code. */ -use Flowpack\JobQueue\Common\Queue\Message; +use Flowpack\JobQueue\Common\Tests\Functional\AbstractQueueTest; use Flowpack\JobQueue\Redis\Queue\RedisQueue; -use Predis\Client as PredisClient; -use TYPO3\Flow\Configuration\ConfigurationManager; -use TYPO3\Flow\Tests\FunctionalTestCase; /** * Functional test for RedisQueue */ -class RedisQueueTest extends FunctionalTestCase +class RedisQueueTest extends AbstractQueueTest { - - /** - * @var RedisQueue - */ - protected $queue; - - /** - * Set up dependencies - */ - public function setUp() - { - parent::setUp(); - $configurationManager = $this->objectManager->get(ConfigurationManager::class); - $settings = $configurationManager->getConfiguration(ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, 'Flowpack.JobQueue.Redis'); - if (!isset($settings['testing']['enabled']) || $settings['testing']['enabled'] !== TRUE) { - $this->markTestSkipped('Test database is not configured'); - } - - $this->queue = new RedisQueue('Test queue', $settings['testing']); - - $client = new \Redis(); - if (!$client->connect($settings['testing']['client']['host'], $settings['testing']['client']['port'])) { - $this->fail('Could not connect to Redis'); - } - if (!$client->select($settings['testing']['client']['database'])) { - $this->fail('Could not select database'); - } - $client->flushDB(); - } - - /** - * @test - */ - public function submitAndWaitWithMessageWorks() - { - $message = new Message('Yeah, tell someone it works!'); - $this->queue->submit($message); - - $result = $this->queue->waitAndTake(1); - $this->assertNotNull($result, 'wait should receive message'); - $this->assertEquals($message->getPayload(), $result->getPayload(), 'message should have payload as before'); - } - - /** - * @test - */ - public function waitForMessageTimesOut() - { - $result = $this->queue->waitAndTake(1); - $this->assertNull($result, 'wait should return NULL after timeout'); - } - - /** - * @test - */ - public function identifierMakesMessagesUnique() - { - $message = new Message('Yeah, tell someone it works!', 'test.message'); - $identicalMessage = new Message('Yeah, tell someone it works!', 'test.message'); - $this->queue->submit($message); - $this->queue->submit($identicalMessage); - - $this->assertEquals(Message::STATE_NEW, $identicalMessage->getState()); - - $result = $this->queue->waitAndTake(1); - $this->assertNotNull($result, 'wait should receive message'); - - $result = $this->queue->waitAndTake(1); - $this->assertNull($result, 'message should not be queued twice'); - } - /** - * @test + * @inheritdoc */ - public function peekReturnsNextMessagesIfQueueHasMessages() + protected function getQueue() { - $message = new Message('First message'); - $this->queue->submit($message); - $message = new Message('Another message'); - $this->queue->submit($message); - - $results = $this->queue->peek(1); - $this->assertEquals(1, count($results), 'peek should return a message'); - $result = $results[0]; - $this->assertEquals('First message', $result->getPayload()); - $this->assertEquals(Message::STATE_SUBMITTED, $result->getState()); - - $results = $this->queue->peek(1); - $this->assertEquals(1, count($results), 'peek should return a message again'); - $result = $results[0]; - $this->assertEquals('First message', $result->getPayload(), 'second peek should return the same message again'); + return new RedisQueue('Test queue', $this->queueSettings); } - - /** - * @test - */ - public function peekReturnsNullIfQueueHasNoMessage() - { - $result = $this->queue->peek(); - $this->assertEquals(array(), $result, 'peek should not return a message'); - } - - /** - * @test - */ - public function waitAndReserveWithFinishRemovesMessage() - { - $message = new Message('First message'); - $this->queue->submit($message); - - $result = $this->queue->waitAndReserve(1); - $this->assertNotNull($result, 'waitAndReserve should receive message'); - $this->assertEquals($message->getPayload(), $result->getPayload(), 'message should have payload as before'); - - $result = $this->queue->peek(); - $this->assertEquals(array(), $result, 'no message should be present in queue'); - - $finishResult = $this->queue->finish($message); - $this->assertTrue($finishResult); - } - } \ No newline at end of file