From 3298d332e623aa203b58509248ad2f2f1ca24322 Mon Sep 17 00:00:00 2001 From: Volker Date: Thu, 2 Dec 2021 14:26:59 +0100 Subject: [PATCH 1/3] Add LazyConnectionPool --- src/Io/LazyConnectionPool.php | 96 +++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 src/Io/LazyConnectionPool.php diff --git a/src/Io/LazyConnectionPool.php b/src/Io/LazyConnectionPool.php new file mode 100644 index 0000000..6e32cfa --- /dev/null +++ b/src/Io/LazyConnectionPool.php @@ -0,0 +1,96 @@ +connectionSelector = $connectionSelector; + $this->poolSize = $poolSize; + for ($i = 0; $i < $poolSize; $i++) { + $this->pool[$i] = $connection = $factory->createLazyConnection($connectionURI); + $this->requestCounter[$i] = 0; + Util::forwardEvents($connection, $this, ['error', 'close']); + } + } + + protected function shiftPoolPointer(): int + { + switch ($this->connectionSelector) { + case self::CS_ROUND_ROBIN: + $this->poolPointer = ($this->poolPointer + 1) % $this->poolSize; + break; + case self::CS_BY_LOAD: + $rcList = $this->requestCounter; // copy + asort($rcList, SORT_NUMERIC); + $this->poolPointer = key($rcList); + break; + } + return $this->poolPointer; + } + + public function query($sql, array $params = array()): \React\Promise\PromiseInterface + { + $pointer = $this->shiftPoolPointer(); + $this->requestCounter[$pointer]++; + return $this->pool[$pointer]->query($sql, $params)->then(function ($result) use ($pointer) { + $this->requestCounter[$pointer]--; + return $result; + }); + } + + public function queryStream($sql, $params = array()): \React\Stream\ReadableStreamInterface + { + $pointer = $this->shiftPoolPointer(); + $this->requestCounter[$pointer]++; + return $this->pool[$pointer]->queryStream($sql, $params)->then(function ($result) use ($pointer) { + $this->requestCounter[$pointer]--; + return $result; + }); + } + + public function ping(): \React\Promise\PromiseInterface + { + $pointer = $this->shiftPoolPointer(); + $this->requestCounter[$pointer]++; + return $this->pool[$pointer]->ping()->then(function ($result) use ($pointer) { + $this->requestCounter[$pointer]--; + return $result; + }); + } + + public function quit(): \React\Promise\PromiseInterface + { + return resolve(array_map(function ($connection) { + $connection->quit(); + return $connection; + }, $this->pool)); + } + + public function close(): \React\Promise\PromiseInterface + { + return resolve(array_map(function ($connection) { + $connection->close(); + return $connection; + }, $this->pool)); + } +} \ No newline at end of file From ca973d3262a234e0f9c27fc4b80dde9d65f0c096 Mon Sep 17 00:00:00 2001 From: Volker Date: Thu, 2 Dec 2021 14:35:27 +0100 Subject: [PATCH 2/3] remove unused class vars --- src/Io/LazyConnectionPool.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Io/LazyConnectionPool.php b/src/Io/LazyConnectionPool.php index 6e32cfa..d78658e 100644 --- a/src/Io/LazyConnectionPool.php +++ b/src/Io/LazyConnectionPool.php @@ -14,8 +14,6 @@ class LazyConnectionPool extends EventEmitter implements ConnectionInterface const CS_ROUND_ROBIN = 'round-robin'; const CS_BY_LOAD = 'load'; - protected Factory $factory; - protected string $connectionURI; protected array $pool = []; protected int $poolSize; protected int $poolPointer = 0; // current connection in pool - RoundRobin @@ -93,4 +91,4 @@ public function close(): \React\Promise\PromiseInterface return $connection; }, $this->pool)); } -} \ No newline at end of file +} From 08ab979db3a744e82f08042e44b673dc11aa8817 Mon Sep 17 00:00:00 2001 From: Volker Date: Tue, 7 Dec 2021 08:29:53 +0100 Subject: [PATCH 3/3] keep in mind, DRY --- src/Io/LazyConnectionPool.php | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/Io/LazyConnectionPool.php b/src/Io/LazyConnectionPool.php index d78658e..3f3c713 100644 --- a/src/Io/LazyConnectionPool.php +++ b/src/Io/LazyConnectionPool.php @@ -31,6 +31,10 @@ public function __construct(Factory $factory, string $connectionURI, int $poolSi } } + /** + * set the internal pool-pointer to the next valid connection on depending on the connectionSelector + * @return int + */ protected function shiftPoolPointer(): int { switch ($this->connectionSelector) { @@ -46,33 +50,39 @@ protected function shiftPoolPointer(): int return $this->poolPointer; } - public function query($sql, array $params = array()): \React\Promise\PromiseInterface + /** + * @param callable $callback received an ConnectionInterface as parameter + * @return mixed + */ + protected function pooledCallback(callable $callback) { $pointer = $this->shiftPoolPointer(); $this->requestCounter[$pointer]++; - return $this->pool[$pointer]->query($sql, $params)->then(function ($result) use ($pointer) { + $connection = $this->pool[$pointer]; + return $callback($connection)->then(function ($result) use ($pointer) { $this->requestCounter[$pointer]--; return $result; }); } + public function query($sql, array $params = array()): \React\Promise\PromiseInterface + { + return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->query($sql, $params); + }); + } + public function queryStream($sql, $params = array()): \React\Stream\ReadableStreamInterface { - $pointer = $this->shiftPoolPointer(); - $this->requestCounter[$pointer]++; - return $this->pool[$pointer]->queryStream($sql, $params)->then(function ($result) use ($pointer) { - $this->requestCounter[$pointer]--; - return $result; + return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->queryStream($sql, $params); }); } public function ping(): \React\Promise\PromiseInterface { - $pointer = $this->shiftPoolPointer(); - $this->requestCounter[$pointer]++; - return $this->pool[$pointer]->ping()->then(function ($result) use ($pointer) { - $this->requestCounter[$pointer]--; - return $result; + return $this->pooledCallback(function (ConnectionInterface $connection) { + return $connection->ping(); }); }