Skip to content

Commit

Permalink
Merge pull request #12 from SwissBorg/chore/format
Browse files Browse the repository at this point in the history
Update and simplify scalafmt configuration
  • Loading branch information
lomigmegard authored Feb 16, 2024
2 parents 91cef25 + 837aac5 commit 6c1f771
Show file tree
Hide file tree
Showing 84 changed files with 997 additions and 893 deletions.
61 changes: 7 additions & 54 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,56 +1,9 @@
version = 3.0.6
version = 3.5.8

style = defaultWithAlign
runner.dialect = scala213

docstrings.style = Asterisk
docstrings.wrap = no
indentOperator.preset = spray
maxColumn = 120
rewrite.rules = [RedundantParens, SortImports, AvoidInfix]
unindentTopLevelOperators = true
align.tokens = [{code = "=>", owner = "Case"}]
align.openParenDefnSite = false
align.openParenCallSite = false
optIn.breakChainOnFirstMethodDot = false
optIn.configStyleArguments = false
danglingParentheses.defnSite = false
danglingParentheses.callSite = false
spaces.inImportCurlyBraces = true
rewrite.neverInfix.excludeFilters = [
and
min
max
until
to
by
eq
ne
"should.*"
"contain.*"
"must.*"
in
ignore
be
taggedAs
thrownBy
synchronized
have
when
size
only
noneOf
oneElementOf
noElementsOf
atLeastOneElementOf
atMostOneElementOf
allElementsOf
inOrderElementsOf
theSameElementsAs
]
rewriteTokens = {
"⇒": "=>"
"→": "->"
"←": "<-"
}
newlines.afterCurlyLambda = preserve
newlines.implicitParamListModifierPrefer = before
maxColumn = 120

rewrite.rules = [SortModifiers, Imports]
rewrite.imports.sort = scalastyle
rewrite.imports.groups = [["(?!javax?\\.).*", "(?!scala\\.).*"], ["javax?\\..*", "scala\\..*"]]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.apache.pekko.persistence.postgres.config

import org.apache.pekko.persistence.postgres.util.ConfigOps._
import com.typesafe.config.Config
import org.apache.pekko.persistence.postgres.util.ConfigOps._

import scala.concurrent.duration._

Expand Down Expand Up @@ -169,14 +169,16 @@ object JournalSequenceRetrievalConfig {
maxTries = config.asInt("journal-sequence-retrieval.max-tries", 10),
queryDelay = config.asFiniteDuration("journal-sequence-retrieval.query-delay", 1.second),
maxBackoffQueryDelay = config.asFiniteDuration("journal-sequence-retrieval.max-backoff-query-delay", 1.minute),
askTimeout = config.asFiniteDuration("journal-sequence-retrieval.ask-timeout", 1.second))
askTimeout = config.asFiniteDuration("journal-sequence-retrieval.ask-timeout", 1.second)
)
}
case class JournalSequenceRetrievalConfig(
batchSize: Int,
maxTries: Int,
queryDelay: FiniteDuration,
maxBackoffQueryDelay: FiniteDuration,
askTimeout: FiniteDuration)
askTimeout: FiniteDuration
)

class ReadJournalConfig(config: Config) {
val journalTableConfiguration = new JournalTableConfiguration(config)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.apache.pekko.persistence.postgres.db

import java.sql.SQLException

import org.slf4j.Logger

import java.sql.SQLException
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }
import scala.util.{Failure, Success}

object DbErrors {

Expand All @@ -14,8 +13,9 @@ object DbErrors {
val PgDuplicateTable: String = "42P07"
val PgUniqueViolation: String = "23505"

def withHandledPartitionErrors(logger: Logger, partitionDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(
implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
def withHandledPartitionErrors(logger: Logger, partitionDetails: String)(
dbio: DBIOAction[_, NoStream, Effect]
)(implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
dbio.asTry.flatMap {
case Failure(ex: SQLException) if ex.getSQLState == PgDuplicateTable =>
logger.debug(s"Partition for $partitionDetails already exists")
Expand All @@ -28,8 +28,9 @@ object DbErrors {
DBIO.successful(())
}

def withHandledIndexErrors(logger: Logger, indexDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(
implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
def withHandledIndexErrors(logger: Logger, indexDetails: String)(
dbio: DBIOAction[_, NoStream, Effect]
)(implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
dbio.asTry.flatMap {
case Failure(ex: SQLException) if ex.getSQLState == PgUniqueViolation =>
logger.debug(s"Index $indexDetails already exists")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,38 @@

package org.apache.pekko.persistence.postgres.db

import com.typesafe.config.Config
import org.apache.pekko.actor.ActorSystem
import javax.naming.InitialContext
import org.apache.pekko.persistence.postgres.config.SlickConfiguration
import com.typesafe.config.Config
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import slick.jdbc.JdbcBackend._
import slick.jdbc.JdbcProfile

/**
* INTERNAL API
*/
import javax.naming.InitialContext

/** INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
object SlickDriver {

/**
* INTERNAL API
*/
/** INTERNAL API
*/
@deprecated(message = "Internal API, will be removed in 4.0.0", since = "3.4.0")
def forDriverName(config: Config): JdbcProfile =
SlickDatabase.profile(config, "slick")
}

/**
* INTERNAL API
*/
/** INTERNAL API
*/
object SlickDatabase {

/**
* INTERNAL API
*/
/** INTERNAL API
*/
private[postgres] def profile(config: Config, path: String): JdbcProfile =
DatabaseConfig.forConfig[JdbcProfile](path, config).profile

/**
* INTERNAL API
*/
/** INTERNAL API
*/
private[postgres] def database(config: Config, slickConfiguration: SlickConfiguration, path: String): Database = {
slickConfiguration.jndiName
.map(Database.forName(_, None))
Expand All @@ -50,13 +46,13 @@ object SlickDatabase {
.getOrElse(Database.forConfig(path, config))
}

/**
* INTERNAL API
*/
/** INTERNAL API
*/
private[postgres] def initializeEagerly(
config: Config,
slickConfiguration: SlickConfiguration,
path: String): SlickDatabase = {
path: String
): SlickDatabase = {
val dbPath = if (path.isEmpty) "db" else s"$path.db"
EagerSlickDatabase(database(config, slickConfiguration, dbPath), profile(config, path))
}
Expand All @@ -66,22 +62,20 @@ trait SlickDatabase {
def database: Database
def profile: JdbcProfile

/**
* If true, the requesting side usually a (read/write/snapshot journal)
* should shutdown the database when it closes. If false, it should leave
* the database connection pool open, since it might still be used elsewhere.
*/
/** If true, the requesting side usually a (read/write/snapshot journal) should shutdown the database when it closes.
* If false, it should leave the database connection pool open, since it might still be used elsewhere.
*/
def allowShutdown: Boolean
}

case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends SlickDatabase {
override def allowShutdown: Boolean = true
}

/**
* A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database
* @param config The configuration used to create the database
*/
/** A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database
* @param config
* The configuration used to create the database
*/
class LazySlickDatabase(config: Config, system: ActorSystem) extends SlickDatabase {
val profile: JdbcProfile = SlickDatabase.profile(config, path = "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

package org.apache.pekko.persistence.postgres.db

import org.apache.pekko.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import org.apache.pekko.persistence.postgres.config.{ ConfigKeys, SlickConfiguration }
import com.typesafe.config.{Config, ConfigObject}
import org.apache.pekko.actor.{ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import org.apache.pekko.persistence.postgres.config.{ConfigKeys, SlickConfiguration}
import org.apache.pekko.persistence.postgres.util.ConfigOps._
import com.typesafe.config.{ Config, ConfigObject }

import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success }
import scala.util.{Failure, Success}

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
override def lookup: SlickExtension.type = SlickExtension
Expand All @@ -32,23 +32,20 @@ class SlickExtensionImpl(system: ExtendedActorSystem) extends Extension {
def database(config: Config): SlickDatabase = dbProvider.database(config)
}

/**
* User overridable database provider.
* Since this provider is called from an pekko extension it must be thread safe!
*
* A SlickDatabaseProvider is loaded using reflection,
* The instance is created using the following:
* - The fully qualified class name as configured in `postgres-journal.database-provider-fqcn`.
* - The constructor with one argument of type [[org.apache.pekko.actor.ActorSystem]] is used to create the instance.
* Therefore the class must have such a constructor.
*/
/** User overridable database provider. Since this provider is called from an pekko extension it must be thread safe!
*
* A SlickDatabaseProvider is loaded using reflection, The instance is created using the following:
* - The fully qualified class name as configured in `postgres-journal.database-provider-fqcn`.
* - The constructor with one argument of type [[org.apache.pekko.actor.ActorSystem]] is used to create the instance.
* Therefore the class must have such a constructor.
*/
trait SlickDatabaseProvider {

/**
* Create or retrieve the database
* @param config The configuration which may be used to create the database. If the database is shared
* then the SlickDatabaseProvider implementation may choose to ignore this parameter.
*/
/** Create or retrieve the database
* @param config
* The configuration which may be used to create the database. If the database is shared then the
* SlickDatabaseProvider implementation may choose to ignore this parameter.
*/
def database(config: Config): SlickDatabase
}

Expand All @@ -66,15 +63,18 @@ class DefaultSlickDatabaseProvider(system: ActorSystem) extends SlickDatabasePro
case (key, notAnObject) =>
throw new RuntimeException(
s"""Expected "pekko-persistence-postgres.shared-databases.$key" to be a config ConfigObject, but got ${notAnObject
.valueType()} (${notAnObject.getClass})""")
.valueType()} (${notAnObject.getClass})"""
)
}
.toMap

private def getSharedDbOrThrow(sharedDbName: String): LazySlickDatabase =
sharedDatabases.getOrElse(
sharedDbName,
throw new RuntimeException(
s"No shared database is configured under pekko-persistence-postgres.shared-databases.$sharedDbName"))
s"No shared database is configured under pekko-persistence-postgres.shared-databases.$sharedDbName"
)
)

def database(config: Config): SlickDatabase = {
config.asOptionalNonEmptyString(ConfigKeys.useSharedDb) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,34 @@

package org.apache.pekko.persistence.postgres.journal

import java.util.{ HashMap => JHMap, Map => JMap }

import com.typesafe.config.Config
import org.apache.pekko.Done
import org.apache.pekko.actor.{ ActorSystem, ExtendedActorSystem }
import org.apache.pekko.actor.{ActorSystem, ExtendedActorSystem}
import org.apache.pekko.pattern.pipe
import org.apache.pekko.persistence.postgres.config.JournalConfig
import org.apache.pekko.persistence.postgres.db.{ SlickDatabase, SlickExtension }
import org.apache.pekko.persistence.postgres.journal.PostgresAsyncWriteJournal.{ InPlaceUpdateEvent, WriteFinished }
import org.apache.pekko.persistence.postgres.journal.dao.{ JournalDao, JournalDaoWithUpdates }
import org.apache.pekko.persistence.{AtomicWrite, PersistentRepr}
import org.apache.pekko.persistence.journal.AsyncWriteJournal
import org.apache.pekko.persistence.{ AtomicWrite, PersistentRepr }
import org.apache.pekko.serialization.{ Serialization, SerializationExtension }
import org.apache.pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.Config
import org.apache.pekko.persistence.postgres.config.JournalConfig
import org.apache.pekko.persistence.postgres.db.{SlickDatabase, SlickExtension}
import org.apache.pekko.persistence.postgres.journal.PostgresAsyncWriteJournal.{InPlaceUpdateEvent, WriteFinished}
import org.apache.pekko.persistence.postgres.journal.dao.{JournalDao, JournalDaoWithUpdates}
import org.apache.pekko.serialization.{Serialization, SerializationExtension}
import org.apache.pekko.stream.{Materializer, SystemMaterializer}
import slick.jdbc.JdbcBackend._

import java.util.{HashMap => JHMap, Map => JMap}
import scala.collection.immutable._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object PostgresAsyncWriteJournal {
private case class WriteFinished(pid: String, f: Future[_])

/**
* Extra Plugin API: May be used to issue in-place updates for events.
* To be used only for data migrations such as "encrypt all events" and similar operations.
*
* The write payload may be wrapped in a [[org.apache.pekko.persistence.journal.Tagged]],
* in which case the new tags will be skipped and the old tags remain unchanged.
*/
/** Extra Plugin API: May be used to issue in-place updates for events. To be used only for data migrations such as
* "encrypt all events" and similar operations.
*
* The write payload may be wrapped in a [[org.apache.pekko.persistence.journal.Tagged]], in which case the new tags
* will be skipped and the old tags remain unchanged.
*/
final case class InPlaceUpdateEvent(persistenceId: String, seqNr: Long, write: AnyRef)
}

Expand All @@ -55,7 +53,8 @@ class PostgresAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
(classOf[JournalConfig], journalConfig),
(classOf[Serialization], SerializationExtension(system)),
(classOf[ExecutionContext], ec),
(classOf[Materializer], mat))
(classOf[Materializer], mat)
)
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match {
case Success(dao) => dao
case Failure(cause) => throw cause
Expand All @@ -66,8 +65,10 @@ class PostgresAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
journalDao match {
case upgraded: JournalDaoWithUpdates => upgraded
case _ =>
throw new IllegalStateException(s"The ${journalDao.getClass} does NOT implement [JournalDaoWithUpdates], " +
s"which is required to perform updates of events! Please configure a valid update capable DAO (e.g. the default [FlatJournalDao].")
throw new IllegalStateException(
s"The ${journalDao.getClass} does NOT implement [JournalDaoWithUpdates], " +
s"which is required to perform updates of events! Please configure a valid update capable DAO (e.g. the default [FlatJournalDao]."
)
}

// readHighestSequence must be performed after pending write for a persistenceId
Expand Down Expand Up @@ -111,7 +112,8 @@ class PostgresAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
}

override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] =
recoveryCallback: PersistentRepr => Unit
): Future[Unit] =
journalDao
.messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None)
.take(max)
Expand Down
Loading

0 comments on commit 6c1f771

Please sign in to comment.