Skip to content

Commit

Permalink
postgres_driver.nim: split connPool into writeConPool and readConPool
Browse files Browse the repository at this point in the history
This aims to avoid clashes in insert and select queries
because the inserts and selects can happen concurrently
in relay and store events, respectively.
  • Loading branch information
Ivansete-status committed Oct 27, 2023
1 parent 0ee4293 commit cfb2a5d
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import
export postgres_driver

type PostgresDriver* = ref object of ArchiveDriver
connPool: PgAsyncPool
## Establish a separate pools for read/write operations
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool

proc dropTableQuery(): string =
"DROP TABLE messages"
Expand Down Expand Up @@ -48,21 +50,25 @@ proc new*(T: type PostgresDriver,
onErrAction: OnErrHandler = nil):
ArchiveDriverResult[T] =

let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections)
if connPoolRes.isErr():
return err("error creating PgAsyncPool: " & connPoolRes.error)
let readConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating read conn pool PgAsyncPool")

let connPool = connPoolRes.get()
let writeConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr:
return err("error creating write conn pool PgAsyncPool")

if not isNil(onErrAction):
asyncSpawn checkConnectivity(connPool, onErrAction)
asyncSpawn checkConnectivity(readConnPool, onErrAction)

return ok(PostgresDriver(connPool: connPool))
if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)

return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))

proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(createTableQuery())
let execRes = await s.writeConnPool.exec(createTableQuery())
if execRes.isErr():
return err("error in createMessageTable: " & execRes.error)

Expand All @@ -71,7 +77,7 @@ proc createMessageTable*(s: PostgresDriver):
proc deleteMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(dropTableQuery())
let execRes = await s.writeConnPool.exec(dropTableQuery())
if execRes.isErr():
return err("error in deleteMessageTable: " & execRes.error)

Expand All @@ -97,7 +103,7 @@ method put*(s: PostgresDriver,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =

let ret = await s.connPool.runStmt(insertRow(),
let ret = await s.writeConnPool.runStmt(insertRow(),
@[toHex(digest.data),
$receivedTime,
message.contentTopic,
Expand Down Expand Up @@ -144,7 +150,7 @@ method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.

let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
let rowsRes = await s.readConnPool.query("""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id FROM messages ORDER BY storedAt ASC""",
newSeq[string](0))
Expand Down Expand Up @@ -214,7 +220,7 @@ method getMessages*(s: PostgresDriver,
query &= " LIMIT ?"
args.add($maxPageSize)

let rowsRes = await s.connPool.query(query, args)
let rowsRes = await s.readConnPool.query(query, args)
if rowsRes.isErr():
return err("failed to run query: " & rowsRes.error)

Expand All @@ -233,7 +239,7 @@ proc getInt(s: PostgresDriver,
Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)

let rowsRes = await s.connPool.query(query)
let rowsRes = await s.readConnPool.query(query, newSeq[string](0))
if rowsRes.isErr():
return err("failed in getRow: " & rowsRes.error)

Expand Down Expand Up @@ -286,7 +292,7 @@ method deleteMessagesOlderThanTimestamp*(
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
let execRes = await s.writeConnPool.exec(
"DELETE FROM messages WHERE storedAt < " & $ts)
if execRes.isErr():
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
Expand All @@ -298,7 +304,7 @@ method deleteOldestMessagesNotWithinLimit*(
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
let execRes = await s.writeConnPool.exec(
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
Expand All @@ -312,8 +318,13 @@ method deleteOldestMessagesNotWithinLimit*(
method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
let result = await s.connPool.close()
return result
(await s.writeConnPool.close()).isOkOr:
return err("error closing write pool: " & $error)

(await s.readConnPool.close()).isOkOr:
return err("error closing read pool: " & $error)

return ok()

proc sleep*(s: PostgresDriver, seconds: int):
Future[ArchiveDriverResult[void]] {.async.} =
Expand All @@ -322,7 +333,7 @@ proc sleep*(s: PostgresDriver, seconds: int):
# database for the amount of seconds given as a parameter.
try:
let params = @[$seconds]
let sleepRes = await s.connPool.query("SELECT pg_sleep(?)", params)
let sleepRes = await s.writeConnPool.query("SELECT pg_sleep(?)", params)
if sleepRes.isErr():
return err("error in postgres_driver sleep: " & sleepRes.error)
except DbError:
Expand Down

0 comments on commit cfb2a5d

Please sign in to comment.