diff --git a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala index 7d403ca8e1..9933806c64 100644 --- a/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala +++ b/acceptance-tests/src/test/scala/io/renku/graph/acceptancetests/db/TriplesStore.scala @@ -25,6 +25,7 @@ import io.renku.db.DBConfigProvider import io.renku.triplesgenerator.TgLockDB.SessionResource import io.renku.triplesgenerator.{TgLockDB, TgLockDbConfigProvider} import io.renku.triplesstore._ +import io.renku.triplesstore.client.util.JenaRunMode import org.typelevel.log4cats.Logger import scala.concurrent.duration._ diff --git a/build.sbt b/build.sbt index 8dee2571ac..d690c02706 100644 --- a/build.sbt +++ b/build.sbt @@ -50,6 +50,7 @@ lazy val root = project triplesGeneratorApi, entitiesSearch, entitiesViewingsCollector, + projectAuth, triplesGenerator, knowledgeGraph ) @@ -159,6 +160,15 @@ lazy val entitiesSearch = project .dependsOn(graphCommons % "compile->compile; test->test") .enablePlugins(AutomateHeaderPlugin) +lazy val projectAuth = project + .in(file("project-auth")) + .settings(commonSettings) + .dependsOn( + renkuModelTinyTypes % "compile->compile; test->test", + triplesStoreClient % "compile->compile; test->test" + ) + .enablePlugins(AutomateHeaderPlugin) + lazy val triplesGeneratorApi = project .in(file("triples-generator-api")) .withId("triples-generator-api") @@ -187,7 +197,8 @@ lazy val triplesGenerator = project .dependsOn( triplesGeneratorApi % "compile->compile; test->test", entitiesSearch, - entitiesViewingsCollector % "compile->compile; test->test" + entitiesViewingsCollector % "compile->compile; test->test", + projectAuth % "compile->compile; test->test" ) .enablePlugins( JavaAppPackaging, diff --git a/entities-search/src/test/scala/io/renku/entities/searchgraphs/SearchInfoDatasets.scala b/entities-search/src/test/scala/io/renku/entities/searchgraphs/SearchInfoDatasets.scala index f40d5cae1e..a5374f727a 100644 --- a/entities-search/src/test/scala/io/renku/entities/searchgraphs/SearchInfoDatasets.scala +++ b/entities-search/src/test/scala/io/renku/entities/searchgraphs/SearchInfoDatasets.scala @@ -49,14 +49,16 @@ trait SearchInfoDatasets { def provisionProjects(projects: entities.Project*)(implicit entityFunctions: EntityFunctions[entities.Project], - graphsProducer: GraphsProducer[entities.Project] + graphsProducer: GraphsProducer[entities.Project], + renkuUrl: RenkuUrl ): IO[Unit] = projects.traverse_[IO, Unit](provisionProject) def provisionProject( project: entities.Project )(implicit entityFunctions: EntityFunctions[entities.Project], - graphsProducer: GraphsProducer[entities.Project] + graphsProducer: GraphsProducer[entities.Project], + renkuUrl: RenkuUrl ): IO[Unit] = uploadIO(projectsDataset, graphsProducer(project): _*) >> insertSearchInfo(project) diff --git a/generators/build.sbt b/generators/build.sbt index db5ae4a6b7..95dc0e2640 100644 --- a/generators/build.sbt +++ b/generators/build.sbt @@ -24,5 +24,6 @@ libraryDependencies ++= Dependencies.circeCore ++ Dependencies.jsonld4s ++ Dependencies.catsCore ++ + Dependencies.fs2Core ++ Dependencies.scalacheck).map(_ % Test) ++ Dependencies.ip4s diff --git a/generators/src/test/scala/io/renku/generators/Generators.scala b/generators/src/test/scala/io/renku/generators/Generators.scala index df96e3f6f4..94d6e6dbda 100644 --- a/generators/src/test/scala/io/renku/generators/Generators.scala +++ b/generators/src/test/scala/io/renku/generators/Generators.scala @@ -18,15 +18,18 @@ package io.renku.generators +import cats.arrow.FunctionK import cats.data.NonEmptyList +import cats.effect.IO import cats.syntax.all._ -import cats.{Applicative, Functor, Monad, Semigroupal} +import cats.{Applicative, Functor, Monad, Semigroupal, ~>} import com.comcast.ip4s.Port import eu.timepit.refined.api.Refined import eu.timepit.refined.auto._ import eu.timepit.refined.collection.NonEmpty import eu.timepit.refined.numeric.{Negative, NonNegative, NonPositive, Positive} import eu.timepit.refined.string.Url +import fs2.Stream import io.circe.{Encoder, Json} import org.scalacheck.Gen._ import org.scalacheck.{Arbitrary, Gen} @@ -354,6 +357,9 @@ object Generators { def generateNonEmptyList(min: Int = 1, max: Int = 5): NonEmptyList[T] = generateExample(nonEmptyList(generator, min, max)) + def asStream: Stream[Gen, T] = + Stream.eval(generator) ++ asStream + def generateOption: Option[T] = Gen.option(generator).sample getOrElse generateOption def generateSome: Option[T] = Option(generator.generateOne) @@ -410,6 +416,15 @@ object Generators { } } + private def runGen[A](ga: Gen[A]): IO[A] = IO(ga.generateOne) + + private val genToIO: Gen ~> IO = + FunctionK.lift[Gen, IO](runGen) + + implicit class GenStreamOps[A](gens: Stream[Gen, A]) { + def toIO: Stream[IO, A] = gens.translate(genToIO) + } + implicit def asArbitrary[T](implicit generator: Gen[T]): Arbitrary[T] = Arbitrary(generator) implicit val semigroupalGen: Semigroupal[Gen] = new Semigroupal[Gen] { diff --git a/graph-commons/src/main/scala/io/renku/triplesstore/DatasetConnectionConfig.scala b/graph-commons/src/main/scala/io/renku/triplesstore/DatasetConnectionConfig.scala index 015126d8b3..8399568cd7 100644 --- a/graph-commons/src/main/scala/io/renku/triplesstore/DatasetConnectionConfig.scala +++ b/graph-commons/src/main/scala/io/renku/triplesstore/DatasetConnectionConfig.scala @@ -25,6 +25,8 @@ import io.renku.config.ConfigLoader.urlTinyTypeReader import io.renku.http.client.{BasicAuthCredentials, BasicAuthPassword, BasicAuthUsername} import io.renku.tinytypes.constraints.{Url, UrlOps} import io.renku.tinytypes.{TinyTypeFactory, UrlTinyType} +import io.renku.triplesstore.client.http.{ConnectionConfig, Retry} +import org.http4s.{BasicCredentials, Uri} import pureconfig.ConfigReader trait FusekiConnectionConfig { @@ -50,6 +52,13 @@ trait DatasetConnectionConfig extends FusekiConnectionConfig { val fusekiUrl: FusekiUrl val datasetName: DatasetName val authCredentials: BasicAuthCredentials + + def toCC(retryCfg: Option[Retry.RetryConfig] = None): ConnectionConfig = + ConnectionConfig( + Uri.unsafeFromString(fusekiUrl.value) / datasetName.value, + Some(BasicCredentials(authCredentials.username.value, authCredentials.password.value)), + retryCfg + ) } final case class ProjectsConnectionConfig(fusekiUrl: FusekiUrl, authCredentials: BasicAuthCredentials) diff --git a/graph-commons/src/test/scala/io/renku/triplesstore/ExternalJenaForSpec.scala b/graph-commons/src/test/scala/io/renku/triplesstore/ExternalJenaForSpec.scala index 9a80b0bde5..83092856ca 100644 --- a/graph-commons/src/test/scala/io/renku/triplesstore/ExternalJenaForSpec.scala +++ b/graph-commons/src/test/scala/io/renku/triplesstore/ExternalJenaForSpec.scala @@ -20,6 +20,7 @@ package io.renku.triplesstore import eu.timepit.refined.auto._ import io.renku.testtools.IOSpec +import io.renku.triplesstore.client.util.JenaRunMode import org.scalatest.Suite /** Use this trait as a replacement for [[InMemoryJenaForSpec]] to connect to a locally/externally running Jena without diff --git a/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJena.scala b/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJena.scala index 83aa0e8c98..f7a3b1f40f 100644 --- a/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJena.scala +++ b/graph-commons/src/test/scala/io/renku/triplesstore/InMemoryJena.scala @@ -21,10 +21,8 @@ package io.renku.triplesstore import cats.effect.IO import cats.effect.unsafe.IORuntime import cats.syntax.all._ -import com.dimafeng.testcontainers.{FixedHostPortGenericContainer, GenericContainer, SingleContainer} -import eu.timepit.refined.api.Refined +import com.dimafeng.testcontainers.SingleContainer import eu.timepit.refined.auto._ -import eu.timepit.refined.numeric.Positive import io.circe.{Decoder, HCursor, Json} import io.renku.graph.model._ import io.renku.graph.model.entities.{EntityFunctions, Person} @@ -36,8 +34,7 @@ import io.renku.jsonld._ import io.renku.logging.TestSparqlQueryTimeRecorder import io.renku.triplesstore.client.model.{Quad, Triple} import io.renku.triplesstore.client.syntax._ -import org.testcontainers.containers -import org.testcontainers.containers.wait.strategy.Wait +import io.renku.triplesstore.client.util.{JenaContainer, JenaRunMode} import scala.collection.mutable import scala.language.reflectiveCalls @@ -48,35 +45,9 @@ trait InMemoryJena { private val adminCredentials = BasicAuthCredentials(BasicAuthUsername("admin"), BasicAuthPassword("admin")) - lazy val container: SingleContainer[_] = jenaRunMode match { - case JenaRunMode.GenericContainer => - GenericContainer( - dockerImage = "renku/renku-jena:0.0.21", - exposedPorts = Seq(3030), - waitStrategy = Wait forHttp "/$/ping" - ) - case JenaRunMode.FixedPortContainer(fixedPort) => - FixedHostPortGenericContainer( - imageName = "renku/renku-jena:0.0.21", - exposedPorts = Seq(3030), - exposedHostPort = fixedPort, - exposedContainerPort = fixedPort, - waitStrategy = Wait forHttp "/$/ping" - ) - case JenaRunMode.Local(_) => - new GenericContainer(new containers.GenericContainer("") { - override def start(): Unit = () - override def stop(): Unit = () - }) - } - - private lazy val fusekiServerPort: Int Refined Positive = jenaRunMode match { - case JenaRunMode.GenericContainer => Refined.unsafeApply(container.mappedPort(container.exposedPorts.head)) - case JenaRunMode.FixedPortContainer(port) => port - case JenaRunMode.Local(port) => port - } + lazy val container: SingleContainer[_] = JenaContainer.create(jenaRunMode) - lazy val fusekiUrl: FusekiUrl = FusekiUrl(s"http://localhost:$fusekiServerPort") + lazy val fusekiUrl: FusekiUrl = FusekiUrl(JenaContainer.fusekiUrl(jenaRunMode, container)) private val datasets: mutable.Map[FusekiUrl => DatasetConnectionConfig, DatasetConfigFile] = mutable.Map.empty diff --git a/project-auth/build.sbt b/project-auth/build.sbt new file mode 100644 index 0000000000..0325353b3c --- /dev/null +++ b/project-auth/build.sbt @@ -0,0 +1,4 @@ +organization := "io.renku" +name := "project-auth" + +libraryDependencies ++= Dependencies.http4sClient diff --git a/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthData.scala b/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthData.scala new file mode 100644 index 0000000000..c31fb7b794 --- /dev/null +++ b/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthData.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.projectauth + +import io.renku.graph.model.projects.{ResourceId, Slug, Visibility} +import io.renku.graph.model.{RenkuUrl, Schemas} +import io.renku.jsonld.JsonLD.JsonLDArray +import io.renku.jsonld.syntax._ +import io.renku.jsonld.{EntityTypes, JsonLD, JsonLDEncoder} + +final case class ProjectAuthData( + slug: Slug, + members: Set[ProjectMember], + visibility: Visibility +) + +object ProjectAuthData { + implicit def jsonLDEncoder(implicit renkuUrl: RenkuUrl): JsonLDEncoder[ProjectAuthData] = + JsonLDEncoder.instance { data => + JsonLD.entity( + ResourceId(data.slug).asEntityId, + EntityTypes.of(Schemas.schema / "Project"), + Schemas.renku / "slug" -> data.slug.asJsonLD, + Schemas.renku / "visibility" -> data.visibility.asJsonLD, + Schemas.renku / "memberId" -> JsonLDArray(data.members.map(_.gitLabId.asJsonLD).toSeq), + Schemas.renku / "memberRole" -> JsonLDArray(data.members.map(_.encoded.asJsonLD).toSeq) + ) + } +} diff --git a/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthService.scala b/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthService.scala new file mode 100644 index 0000000000..113be3e8c1 --- /dev/null +++ b/project-auth/src/main/scala/io/renku/projectauth/ProjectAuthService.scala @@ -0,0 +1,147 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.projectauth + +import cats.MonadThrow +import cats.data.NonEmptyList +import cats.effect._ +import cats.syntax.all._ +import fs2.io.net.Network +import fs2.{Pipe, Stream} +import io.circe.Decoder +import io.renku.graph.model.projects.{Slug, Visibility} +import io.renku.graph.model.{RenkuUrl, Schemas} +import io.renku.jsonld.NamedGraph +import io.renku.jsonld.syntax._ +import io.renku.tinytypes.json.TinyTypeDecoders._ +import io.renku.triplesstore.client.http.{ConnectionConfig, RowDecoder, SparqlClient} +import io.renku.triplesstore.client.syntax._ +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +/** Manage authorization data for projects and members. */ +trait ProjectAuthService[F[_]] { + + def update(data: ProjectAuthData): F[Unit] + + def updateAll: Pipe[F, ProjectAuthData, Nothing] + + def remove(slugs: NonEmptyList[Slug]): F[Unit] + def remove(slug: Slug, more: Slug*): F[Unit] = remove(NonEmptyList(slug, more.toList)) + + def getAll(chunkSize: Int = 100): Stream[F, ProjectAuthData] +} + +object ProjectAuthService { + + def resource[F[_]: Async: Network: Logger]( + connectionConfig: ConnectionConfig, + timeout: Duration = 20.minutes + )(implicit renkuUrl: RenkuUrl): Resource[F, ProjectAuthService[F]] = + SparqlClient(connectionConfig, timeout).map(c => apply[F](c, renkuUrl)) + + def apply[F[_]: MonadThrow](client: SparqlClient[F], renkuUrl: RenkuUrl): ProjectAuthService[F] = + new Impl[F](client, renkuUrl) + + private final class Impl[F[_]: MonadThrow](sparqlClient: SparqlClient[F], renkuUrl: RenkuUrl) + extends ProjectAuthService[F] { + private[this] val graph = Schemas.renku / "ProjectAuth" + private implicit val rUrl: RenkuUrl = renkuUrl + + override def remove(slugs: NonEmptyList[Slug]): F[Unit] = + sparqlClient.update(sparql"""PREFIX schema: + |PREFIX renku: + | + |DELETE { Graph $graph {?s ?p ?o} } + |WHERE { + | Graph $graph { + | ?s a schema:Project; + | renku:slug ?slug; + | ?p ?o. + | VALUES(?slug) { ${slugs.toList.map(_.value)} } + | } + |} + """.stripMargin) + + override def update(data: ProjectAuthData): F[Unit] = { + val jsonld = NamedGraph.fromJsonLDsUnsafe(graph, data.asJsonLD) + remove(data.slug) >> sparqlClient.upload(jsonld) + } + + override def updateAll: Pipe[F, ProjectAuthData, Nothing] = + _.chunks + .evalTap(_.toNel.map(_.map(_.slug)).map(remove).getOrElse(().pure[F])) + .map(chunk => + chunk.toNel match { // TODO improve that ergonomics for NamedGraph in jsonld4s + case Some(nel) => NamedGraph.fromJsonLDsUnsafe(graph, nel.head.asJsonLD, nel.tail.map(_.asJsonLD): _*) + case None => NamedGraph(graph, Seq.empty) + } + ) + .evalMap(sparqlClient.upload) + .drain + + override def getAll(chunkSize: Int): Stream[F, ProjectAuthData] = + streamAll(chunkSize) + + private def streamAll(chunkSize: Int) = + Stream + .iterate(0)(_ + chunkSize) + .evalMap(offset => getChunk(chunkSize, offset)) + .takeWhile(_.nonEmpty) + .flatMap(Stream.emits) + .groupAdjacentBy(_._1) + .map { case (slug, rest) => + val members = rest.toList.flatMap(_._3) + val vis = rest.head.map(_._2) + vis.map(v => ProjectAuthData(slug, members.toSet, v)) + } + .unNone + + private def getChunk(limit: Int, offset: Int) = + sparqlClient.queryDecode[(Slug, Visibility, Option[ProjectMember])]( + sparql"""PREFIX schema: + |PREFIX renku: + | + |SELECT ?slug ?visibility ?memberRole + |WHERE { + | Graph ${graph.asSparql} { + | ?project a schema:Project; + | renku:slug ?slug; + | renku:visibility ?visibility. + | OPTIONAL { + | ?project renku:memberRole ?memberRole. + | } + | } + |} + |ORDER BY ?slug + |OFFSET $offset + |LIMIT $limit + |""".stripMargin + ) + + private implicit val projectMemberDecoder: Decoder[ProjectMember] = + Decoder.decodeString.emap(ProjectMember.fromEncoded) + + private implicit val tupleRowDecoder: RowDecoder[(Slug, Visibility, Option[ProjectMember])] = + RowDecoder.forProduct3("slug", "visibility", "memberRole")( + Tuple3.apply[Slug, Visibility, Option[ProjectMember]] + ) + } +} diff --git a/project-auth/src/main/scala/io/renku/projectauth/ProjectMember.scala b/project-auth/src/main/scala/io/renku/projectauth/ProjectMember.scala new file mode 100644 index 0000000000..8bd8de36c8 --- /dev/null +++ b/project-auth/src/main/scala/io/renku/projectauth/ProjectMember.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.projectauth + +import io.renku.graph.model.persons.GitLabId +import io.renku.graph.model.projects.Role + +final case class ProjectMember( + gitLabId: GitLabId, + role: Role +) { + + private[projectauth] def encoded: String = + s"${gitLabId.value}:${role.asString}" +} + +object ProjectMember { + private[projectauth] def fromEncoded(str: String): Either[String, ProjectMember] = + str.split(':').toList match { + case idStr :: roleStr :: Nil => + for { + id <- idStr.toIntOption.map(GitLabId.apply).toRight(s"Invalid person GitLabId: $idStr") + role <- Role.fromString(roleStr) + } yield ProjectMember(id, role) + + case _ => + Left(s"Invalid encoded project member: $str") + } + + def fromGitLabData(gitLabId: GitLabId, accessLevel: Int): ProjectMember = + ProjectMember(gitLabId, Role.fromGitLabAccessLevel(accessLevel)) +} diff --git a/project-auth/src/test/scala/io/renku/projectauth/Generators.scala b/project-auth/src/test/scala/io/renku/projectauth/Generators.scala new file mode 100644 index 0000000000..5524a8632a --- /dev/null +++ b/project-auth/src/test/scala/io/renku/projectauth/Generators.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.projectauth + +import io.renku.graph.model.{RenkuTinyTypeGenerators, persons} +import org.scalacheck.Gen + +object Generators { + private val fixedIds = 1 to 800 + private val gitLabIds: Gen[persons.GitLabId] = + Gen.oneOf(fixedIds).map(persons.GitLabId.apply) + + val memberGen: Gen[ProjectMember] = for { + role <- RenkuTinyTypeGenerators.roleGen + id <- gitLabIds + } yield ProjectMember(id, role) + + val projectAuthDataGen: Gen[ProjectAuthData] = for { + slug <- RenkuTinyTypeGenerators.projectSlugs + members <- Gen.choose(0, 150).flatMap(n => Gen.listOfN(n, memberGen)) + visibility <- RenkuTinyTypeGenerators.projectVisibilities + } yield ProjectAuthData(slug, members.toSet, visibility) +} diff --git a/project-auth/src/test/scala/io/renku/projectauth/ProjectAuthServiceSpec.scala b/project-auth/src/test/scala/io/renku/projectauth/ProjectAuthServiceSpec.scala new file mode 100644 index 0000000000..b4abc443b5 --- /dev/null +++ b/project-auth/src/test/scala/io/renku/projectauth/ProjectAuthServiceSpec.scala @@ -0,0 +1,140 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.projectauth + +import cats.data.NonEmptyList +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.effect.testing.scalatest.AsyncIOSpec +import fs2.Stream +import io.renku.generators.Generators.Implicits._ +import io.renku.graph.model.RenkuUrl +import io.renku.graph.model.persons.GitLabId +import io.renku.graph.model.projects.{Role, Visibility} +import io.renku.triplesstore.client.util.JenaContainerSpec +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +class ProjectAuthServiceSpec extends AsyncFlatSpec with AsyncIOSpec with JenaContainerSpec with should.Matchers { + implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + implicit val renkuUrl: RenkuUrl = RenkuUrl("http://localhost/renku") + + def withProjectAuthService: Resource[IO, ProjectAuthService[IO]] = + withDataset("projectauth").map(ProjectAuthService[IO](_, renkuUrl)) + + it should "add data" in { + withProjectAuthService.use { s => + for { + data <- Generators.projectAuthDataGen.asStream.toIO.take(20).compile.toVector + _ <- Stream + .emits(data) + .through(s.updateAll) + .compile + .drain + + n <- s.getAll(15).compile.toVector + _ = n shouldBe data.sortBy(_.slug) + } yield () + } + } + + it should "work with no members" in { + withProjectAuthService.use { s => + for { + data <- Generators.projectAuthDataGen.asStream.toIO + .take(2) + .map(_.copy(members = Set.empty)) + .compile + .toVector + _ <- Stream + .emits(data) + .through(s.updateAll) + .compile + .drain + + n <- s.getAll().compile.toVector + _ = n shouldBe data.sortBy(_.slug) + } yield () + } + } + + it should "remove projects" in { + withProjectAuthService.use { s => + for { + original <- Generators.projectAuthDataGen.asStream.toIO + .take(1) + .map(_.copy(visibility = Visibility.Internal)) + .compile + .lastOrError + _ <- s.update(original) + + n <- s.getAll().compile.toVector + _ = n.head shouldBe original + + _ <- s.remove(original.slug) + _ <- s.getAll().compile.toVector.asserting(v => v shouldBe Vector.empty) + } yield () + } + } + + it should "remove selectively" in { + withProjectAuthService.use { s => + for { + data <- Generators.projectAuthDataGen.asStream.toIO + .take(6) + .compile + .toVector + _ <- Stream.emits(data).through(s.updateAll).compile.drain + + (toremove, tokeep) = data.splitAt(3) + _ <- s.remove(NonEmptyList.fromListUnsafe(toremove.map(_.slug).toList)) + + _ <- s.getAll().compile.toVector.asserting(v => v shouldBe tokeep.sortBy(_.slug)) + } yield () + } + } + + it should "update new properties" in { + withProjectAuthService.use { s => + for { + original <- Generators.projectAuthDataGen.asStream.toIO + .take(1) + .map(_.copy(visibility = Visibility.Internal)) + .compile + .lastOrError + _ <- s.update(original) + + n <- s.getAll().compile.toVector + _ = n.head shouldBe original + + second = original.copy(visibility = Visibility.Public) + _ <- s.update(second) + n2 <- s.getAll().compile.toVector + _ = n2.head shouldBe second + + third = second.copy(members = second.members + ProjectMember(GitLabId(43), Role.Reader)) + _ <- s.update(third) + n3 <- s.getAll().compile.toVector + _ = n3.head shouldBe third + } yield () + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7e7282cc2d..fd7e81921d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,6 +12,7 @@ object Dependencies { val circeGenericExtras = "0.14.3" val circeOptics = "0.14.1" val diffx = "0.8.3" + val fs2 = "3.8.0" val http4s = "0.23.23" val http4sEmber = "0.23.23" val http4sPrometheus = "0.24.4" @@ -131,10 +132,14 @@ object Dependencies { "org.typelevel" %% "log4cats-core" % V.log4cats ) - val testContainersPostgres = Seq( - "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala, - "com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala + val testContainersScalaTest = Seq( + "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainersScala ) + val testContainersPostgres = + testContainersScalaTest ++ + Seq( + "com.dimafeng" %% "testcontainers-scala-postgresql" % V.testContainersScala + ) val wiremock = Seq( "com.github.tomakehurst" % "wiremock-jre8" % V.wiremock @@ -176,6 +181,10 @@ object Dependencies { "io.renku" %% "jsonld4s" % V.jsonld4s ) + val fs2Core = Seq( + "co.fs2" %% "fs2-core" % V.fs2 + ) + val catsCore = Seq( "org.typelevel" %% "cats-core" % V.catsCore ) diff --git a/renku-model-tiny-types/src/main/scala/io/renku/graph/model/projects.scala b/renku-model-tiny-types/src/main/scala/io/renku/graph/model/projects.scala index 9413de0a58..563a419841 100644 --- a/renku-model-tiny-types/src/main/scala/io/renku/graph/model/projects.scala +++ b/renku-model-tiny-types/src/main/scala/io/renku/graph/model/projects.scala @@ -18,9 +18,12 @@ package io.renku.graph.model +import cats.data.NonEmptyList +import cats.kernel.Order import cats.syntax.all._ import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric.Positive +import io.circe.{Decoder, Encoder} import io.renku.graph.model.views.{EntityIdJsonLDOps, NonBlankTTJsonLDOps, TinyTypeJsonLDOps, UrlResourceRenderer} import io.renku.jsonld.{EntityId, JsonLDDecoder, JsonLDEncoder} import io.renku.tinytypes.constraints._ @@ -206,4 +209,70 @@ object projects { extends TinyTypeFactory[Keyword](new Keyword(_)) with NonBlank[Keyword] with NonBlankTTJsonLDOps[Keyword] + + sealed trait Role extends Ordered[Role] { + def asString: String + } + + object Role { + case object Owner extends Role { + val asString = "owner" + + override def compare(that: Role): Int = + if (that == this) 0 else 1 + } + + case object Maintainer extends Role { + val asString = "maintainer" + + override def compare(that: Role): Int = + if (that == this) 0 + else if (that == Owner) -1 + else 1 + } + + case object Reader extends Role { + val asString = "reader" + + override def compare(that: Role): Int = + if (that == this) 0 + else -1 + } + + val all: NonEmptyList[Role] = + NonEmptyList.of(Owner, Maintainer, Reader) + + def fromString(str: String): Either[String, Role] = + all.find(_.asString.equalsIgnoreCase(str)).toRight(s"Invalid role name: $str") + + def unsafeFromString(str: String): Role = + fromString(str).fold(sys.error, identity) + + /** Translated from here: https://docs.gitlab.com/ee/api/members.html#roles */ + def fromGitLabAccessLevel(accessLevel: Int) = + accessLevel match { + case n if n >= 50 => Owner + case n if n >= 40 => Maintainer + case _ => Reader + } + + def toGitLabAccessLevel(role: Role): Int = + role match { + case Role.Owner => 50 + case Role.Maintainer => 40 + case Role.Reader => 20 + } + + implicit val ordering: Ordering[Role] = + Ordering.by(r => -all.toList.indexOf(r)) + + implicit val order: Order[Role] = + Order.fromOrdering + + implicit val jsonDecoder: Decoder[Role] = + Decoder.decodeString.emap(fromString) + + implicit val jsonEncoder: Encoder[Role] = + Encoder.encodeString.contramap(_.asString) + } } diff --git a/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RenkuTinyTypeGenerators.scala b/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RenkuTinyTypeGenerators.scala index 1b62cdb9e8..ab9fadb3cf 100644 --- a/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RenkuTinyTypeGenerators.scala +++ b/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RenkuTinyTypeGenerators.scala @@ -23,6 +23,7 @@ import io.renku.generators.Generators import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators._ import io.renku.graph.model.images.{ImageResourceId, ImageUri} +import io.renku.graph.model.projects.Role import io.renku.graph.model.versions.{CliVersion, SchemaVersion} import io.renku.tinytypes.InstantTinyType import org.scalacheck.Gen @@ -33,6 +34,9 @@ import scala.util.Random trait RenkuTinyTypeGenerators { + val roleGen: Gen[Role] = + Gen.oneOf(Role.all.toList) + def associationResourceIdGen: Gen[associations.ResourceId] = Generators.validatedUrls.map(_.value).map(associations.ResourceId) diff --git a/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RoleSpec.scala b/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RoleSpec.scala new file mode 100644 index 0000000000..b264e7050d --- /dev/null +++ b/renku-model-tiny-types/src/test/scala/io/renku/graph/model/RoleSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.graph.model + +import cats.data.NonEmptyList +import io.renku.graph.model.projects.Role +import io.renku.graph.model.projects.Role._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should + +class RoleSpec extends AnyFlatSpec with should.Matchers { + it should "order roles correctly" in { + val roles = Role.all.sorted + roles shouldBe NonEmptyList.of(Reader, Maintainer, Owner) + Role.all.toList.sorted shouldBe roles.toList + Role.all.toList.sortBy(identity) shouldBe roles.toList + } +} diff --git a/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokenValidatorSpec.scala b/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokenValidatorSpec.scala index 8a8f13f16f..b566ff1451 100644 --- a/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokenValidatorSpec.scala +++ b/token-repository/src/test/scala/io/renku/tokenrepository/repository/creation/TokenValidatorSpec.scala @@ -27,6 +27,7 @@ import io.circe.literal._ import io.renku.generators.CommonGraphGenerators.accessTokens import io.renku.generators.Generators.Implicits._ import io.renku.graph.model.RenkuTinyTypeGenerators.{personGitLabIds, projectIds} +import io.renku.graph.model.projects.Role import io.renku.graph.model.{persons, projects} import io.renku.http.client.RestClient.ResponseMappingF import io.renku.http.client.{AccessToken, GitLabClient} @@ -176,6 +177,8 @@ class MemberRightsCheckerSpec with TableDrivenPropertyChecks with should.Matchers { + def accessLevel(r: Role): Int = Role.toGitLabAccessLevel(r) + "checkValid" should { "return boolean based on the response from the GET to GL's project member API" in new TestCase { @@ -193,23 +196,23 @@ class MemberRightsCheckerSpec Table( ("Case", "Response", "Expected Result"), ("ok role 30 and active", - Response[IO](Ok).withEntity(json"""{"access_level": ${Role.Developer.value}, "state": "active"}"""), + Response[IO](Ok).withEntity(json"""{"access_level": ${accessLevel(Role.Reader)}, "state": "active"}"""), false ), ("ok role 40 and active", - Response[IO](Ok).withEntity(json"""{"access_level": ${Role.Maintainer.value}, "state": "active"}"""), + Response[IO](Ok).withEntity(json"""{"access_level": ${accessLevel(Role.Maintainer)}, "state": "active"}"""), true ), ("ok role 40 and non-active", - Response[IO](Ok).withEntity(json"""{"access_level": ${Role.Owner.value}, "state": "waiting"}"""), + Response[IO](Ok).withEntity(json"""{"access_level": ${accessLevel(Role.Owner)}, "state": "waiting"}"""), false ), ("ok role 40 and no state", - Response[IO](Ok).withEntity(json"""{"access_level": ${Role.Owner.value}}"""), + Response[IO](Ok).withEntity(json"""{"access_level": ${accessLevel(Role.Owner)}}"""), false ), ("ok role 50 and active", - Response[IO](Ok).withEntity(json"""{"access_level": ${Role.Owner.value}, "state": "active"}"""), + Response[IO](Ok).withEntity(json"""{"access_level": ${accessLevel(Role.Owner)}, "state": "active"}"""), true ), ("ok invalid", Response[IO](Ok).withEntity(json"""{}"""), false), @@ -230,13 +233,6 @@ class MemberRightsCheckerSpec } } - private sealed trait Role { val value: Int } - private object Role { - case object Developer extends Role { val value: Int = 30 } - case object Maintainer extends Role { val value: Int = 40 } - case object Owner extends Role { val value: Int = 50 } - } - private trait TestCase { implicit val gitLabClient: GitLabClient[IO] = mock[GitLabClient[IO]] val rightsChecker = new MemberRightsCheckerImpl[IO] diff --git a/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/TriplesGeneratorClientSpec.scala b/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/TriplesGeneratorClientSpec.scala index 2491736e2d..78f3f1ed1b 100644 --- a/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/TriplesGeneratorClientSpec.scala +++ b/triples-generator-api/src/test/scala/io/renku/triplesgenerator/api/TriplesGeneratorClientSpec.scala @@ -34,7 +34,11 @@ import org.scalatest.matchers.should import org.scalatest.wordspec.AsyncWordSpec import org.typelevel.log4cats.Logger -class TriplesGeneratorClientSpec extends AsyncWordSpec with CustomAsyncIOSpec with should.Matchers with ExternalServiceStubbing { +class TriplesGeneratorClientSpec + extends AsyncWordSpec + with CustomAsyncIOSpec + with should.Matchers + with ExternalServiceStubbing { private implicit val logger: Logger[IO] = TestLogger() private lazy val client = new TriplesGeneratorClientImpl[IO](Uri.unsafeFromString(externalServiceBaseUrl)) diff --git a/triples-generator/Dockerfile b/triples-generator/Dockerfile index 548c25c346..c872b5d806 100644 --- a/triples-generator/Dockerfile +++ b/triples-generator/Dockerfile @@ -45,7 +45,7 @@ USER tguser ENV PATH=$PATH:/home/tguser/.local/bin # Installing Renku -RUN python3 -m pip install 'renku==2.6.1' 'sentry-sdk==1.5.11' +RUN python3 -m pip install 'renku==2.6.2' 'sentry-sdk==1.5.11' RUN git config --global user.name 'renku' && \ git config --global user.email 'renku@renkulab.io' && \ diff --git a/triples-generator/src/main/resources/application.conf b/triples-generator/src/main/resources/application.conf index a7f3882a45..ecfbc517cc 100644 --- a/triples-generator/src/main/resources/application.conf +++ b/triples-generator/src/main/resources/application.conf @@ -19,7 +19,7 @@ triples-generation = "renku-log" # Defines expected version of the renku cli and the schema that is generated by the CLI. compatibility { # The expected version of CLI used by TS. - cli-version = "2.6.1" + cli-version = "2.6.2" # The expected version of the schema as returned by CLI. schema-version = "10" diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala index 645a28f9ec..83b9fd12fa 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/Microservice.scala @@ -68,14 +68,20 @@ object Microservice extends IOMicroservice { dbSessionPool <- Resource .eval(new TgLockDbConfigProvider[IO].map(SessionPoolResource[IO, TgLockDB])) .flatMap(identity) - } yield (config, dbSessionPool) + projectConnConfig <- Resource.eval(ProjectsConnectionConfig[IO](config)) + projectsSparql <- ProjectSparqlClient[IO](projectConnConfig) + } yield (config, dbSessionPool, projectsSparql) - resources.use { case (config, dbSessionPool) => - doRun(config, dbSessionPool) + resources.use { case (config, dbSessionPool, projectSparqlClient) => + doRun(config, dbSessionPool, projectSparqlClient) } } - private def doRun(config: Config, dbSessionPool: SessionResource[IO, TgLockDB]): IO[ExitCode] = for { + private def doRun( + config: Config, + dbSessionPool: SessionResource[IO, TgLockDB], + projectSparqlClient: ProjectSparqlClient[IO] + ): IO[ExitCode] = for { implicit0(mr: MetricsRegistry[IO]) <- MetricsRegistry[IO]() implicit0(sqtr: SparqlQueryTimeRecorder[IO]) <- SparqlQueryTimeRecorder[IO]() implicit0(gc: GitLabClient[IO]) <- GitLabClient[IO]() @@ -94,7 +100,7 @@ object Microservice extends IOMicroservice { sentryInitializer <- SentryInitializer[IO] cliVersionCompatChecker <- CliVersionCompatibilityChecker[IO](config) awaitingGenerationSubscription <- awaitinggeneration.SubscriptionFactory[IO] - membersSyncSubscription <- membersync.SubscriptionFactory[IO](tsWriteLock) + membersSyncSubscription <- membersync.SubscriptionFactory[IO](tsWriteLock, projectSparqlClient) triplesGeneratedSubscription <- triplesgenerated.SubscriptionFactory[IO](tsWriteLock) cleanUpSubscription <- cleanup.SubscriptionFactory[IO](tsWriteLock) minProjectInfoSubscription <- minprojectinfo.SubscriptionFactory[IO](tsWriteLock) diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectAuthSync.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectAuthSync.scala new file mode 100644 index 0000000000..48998d4cec --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectAuthSync.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.events.consumers + +import cats.MonadThrow +import cats.effect._ +import cats.syntax.all._ +import fs2.io.net.Network +import io.renku.graph.model.RenkuUrl +import io.renku.graph.model.projects.{Slug, Visibility} +import io.renku.projectauth.{ProjectAuthData, ProjectAuthService, ProjectMember} +import io.renku.triplesstore.ProjectsConnectionConfig +import io.renku.triplesstore.client.http.{RowDecoder, SparqlClient} +import io.renku.triplesstore.client.syntax._ +import org.typelevel.log4cats.Logger + +trait ProjectAuthSync[F[_]] { + def syncProject(slug: Slug, members: Set[ProjectMember]): F[Unit] + def syncProject(data: ProjectAuthData): F[Unit] +} + +object ProjectAuthSync { + + def resource[F[_]: Async: Logger: Network](cc: ProjectsConnectionConfig)(implicit renkuUrl: RenkuUrl) = + ProjectSparqlClient[F](cc).map(apply[F]) + + def apply[F[_]: Sync]( + sparqlClient: ProjectSparqlClient[F] + )(implicit renkuUrl: RenkuUrl): ProjectAuthSync[F] = + new Impl[F](ProjectAuthService[F](sparqlClient, renkuUrl), sparqlClient) + + private final class Impl[F[_]: Sync]( + projectAuthService: ProjectAuthService[F], + sparqlClient: ProjectSparqlClient[F] + ) extends ProjectAuthSync[F] { + private[this] val visibilityFinder: VisibilityFinder[F] = + new VisibilityFinder[F](sparqlClient) + + override def syncProject(slug: Slug, members: Set[ProjectMember]): F[Unit] = + visibilityFinder.find(slug).flatMap { + case Some(vis) => syncProject(ProjectAuthData(slug, members, vis)) + case None => ().pure[F] + } + + override def syncProject(data: ProjectAuthData): F[Unit] = + projectAuthService.update(data) + } + + // Hm, should we get this from gitlab? TODO + private final class VisibilityFinder[F[_]: MonadThrow](sparqlClient: SparqlClient[F]) { + def find(slug: Slug): F[Option[Visibility]] = + sparqlClient + .queryDecode[Visibility](sparql"""PREFIX schema: + |PREFIX renku: + | + |SELECT ?visibility + |WHERE { + | BIND (${slug.asObject} AS ?slug) + | Graph ?id { + | ?id a schema:Project; + | renku:projectPath ?slug; + | renku:projectVisibility ?visibility. + | } + |} + |""".stripMargin) + .map(_.headOption) + + implicit def decoder: RowDecoder[Visibility] = + RowDecoder.forProduct1[Visibility, Visibility]("visibility")(identity)(Visibility.jsonDecoder) + } +} diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectSparqlClient.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectSparqlClient.scala new file mode 100644 index 0000000000..819ac0be51 --- /dev/null +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/ProjectSparqlClient.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.events.consumers + +import cats.effect._ +import fs2.io.net.Network +import io.renku.jsonld.JsonLD +import io.renku.triplesstore.ProjectsConnectionConfig +import io.renku.triplesstore.client.http.{Retry, SparqlClient, SparqlQuery, SparqlUpdate} +import org.typelevel.log4cats.Logger + +/** SparQL client fixed to the `projects` dataset. */ +trait ProjectSparqlClient[F[_]] extends SparqlClient[F] + +object ProjectSparqlClient { + def apply[F[_]: Network: Async: Logger]( + cc: ProjectsConnectionConfig, + retryCfg: Retry.RetryConfig = Retry.RetryConfig.default + ): Resource[F, ProjectSparqlClient[F]] = { + val cfg = cc.toCC(Some(retryCfg)) + SparqlClient[F](cfg).map(c => + new ProjectSparqlClient[F] { + override def update(request: SparqlUpdate) = c.update(request) + override def upload(data: JsonLD) = c.upload(data) + override def query(request: SparqlQuery) = c.query(request) + } + ) + } +} diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala index aad9567101..96611a3068 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/EventHandler.scala @@ -21,6 +21,7 @@ package membersync import cats.effect.{Async, MonadCancelThrow} import cats.syntax.all._ +import fs2.io.net.Network import io.renku.events.{CategoryName, consumers} import io.renku.events.consumers.ProcessExecutor import io.renku.events.consumers.subscriptions.SubscriptionMechanism @@ -62,12 +63,15 @@ private object EventHandler { import eu.timepit.refined.auto._ - def apply[F[_]: Async: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder: Logger]( + def apply[F[ + _ + ]: Async: Network: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder: Logger]( subscriptionMechanism: SubscriptionMechanism[F], - tsWriteLock: TsWriteLock[F] + tsWriteLock: TsWriteLock[F], + projectSparqlClient: ProjectSparqlClient[F] ): F[consumers.EventHandler[F]] = for { tsReadinessChecker <- TSReadinessForEventsChecker[F] - membersSynchronizer <- MembersSynchronizer[F] + membersSynchronizer <- MembersSynchronizer[F](projectSparqlClient) processExecutor <- ProcessExecutor.concurrent(processesCount = 1) } yield new EventHandler[F]( categoryName, diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/KGSynchronizer.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/KGSynchronizer.scala index 66f1ac668e..3808a7db0c 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/KGSynchronizer.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/KGSynchronizer.scala @@ -19,6 +19,7 @@ package io.renku.triplesgenerator.events.consumers.membersync import io.renku.graph.model.projects +import io.renku.triplesgenerator.gitlab.GitLabProjectMember private trait KGSynchronizer[F[_]] { def syncMembers(slug: projects.Slug, membersInGL: Set[GitLabProjectMember]): F[SyncSummary] @@ -29,7 +30,7 @@ private object KGSynchronizerFunctions { def findMembersToAdd(membersInGitLab: Set[GitLabProjectMember], membersInKG: Set[KGProjectMember] ): Set[GitLabProjectMember] = membersInGitLab.collect { - case member @ GitLabProjectMember(gitlabId, _) if !membersInKG.exists(_.gitLabId == gitlabId) => member + case member @ GitLabProjectMember(gitlabId, _, _) if !membersInKG.exists(_.gitLabId == gitlabId) => member } def findMembersToRemove(membersInGitLab: Set[GitLabProjectMember], diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizer.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizer.scala index 1aef956f85..f7f874d309 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizer.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizer.scala @@ -26,6 +26,8 @@ import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.{AccessToken, GitLabClient} import io.renku.logging.ExecutionTimeRecorder import io.renku.logging.ExecutionTimeRecorder.ElapsedTime +import io.renku.triplesgenerator.events.consumers.ProjectSparqlClient +import io.renku.triplesgenerator.gitlab.GitLabProjectMembersFinder import io.renku.triplesstore._ import org.typelevel.log4cats.Logger @@ -69,10 +71,12 @@ private class MembersSynchronizerImpl[F[_]: MonadThrow: AccessTokenFinder: Logge } private object MembersSynchronizer { - def apply[F[_]: Async: GitLabClient: AccessTokenFinder: Logger: SparqlQueryTimeRecorder]: F[MembersSynchronizer[F]] = + def apply[F[_]: Async: GitLabClient: AccessTokenFinder: Logger: SparqlQueryTimeRecorder]( + projectSparqlClient: ProjectSparqlClient[F] + ): F[MembersSynchronizer[F]] = for { gitLabProjectMembersFinder <- GitLabProjectMembersFinder[F] - kgSynchronizer <- namedgraphs.KGSynchronizer[F] + kgSynchronizer <- namedgraphs.KGSynchronizer[F](projectSparqlClient) executionTimeRecorder <- ExecutionTimeRecorder[F](maybeHistogram = None) } yield new MembersSynchronizerImpl[F](gitLabProjectMembersFinder, kgSynchronizer, executionTimeRecorder) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala index 5f8d6238b8..f2984585ed 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/SubscriptionFactory.scala @@ -20,6 +20,7 @@ package io.renku.triplesgenerator.events.consumers.membersync import cats.effect.Async import cats.syntax.all._ +import fs2.io.net.Network import io.renku.events.consumers import io.renku.events.consumers.subscriptions.SubscriptionMechanism import io.renku.events.consumers.subscriptions.SubscriptionPayloadComposer.defaultSubscriptionPayloadComposerFactory @@ -27,14 +28,18 @@ import io.renku.graph.tokenrepository.AccessTokenFinder import io.renku.http.client.GitLabClient import io.renku.triplesgenerator.Microservice import io.renku.triplesgenerator.TgLockDB.TsWriteLock +import io.renku.triplesgenerator.events.consumers.ProjectSparqlClient import io.renku.triplesgenerator.events.consumers.tsmigrationrequest.migrations.reprovisioning.ReProvisioningStatus import io.renku.triplesstore.SparqlQueryTimeRecorder import org.typelevel.log4cats.Logger object SubscriptionFactory { - def apply[F[_]: Async: Logger: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder]( - tsWriteLock: TsWriteLock[F] + def apply[F[ + _ + ]: Async: Network: Logger: ReProvisioningStatus: GitLabClient: AccessTokenFinder: SparqlQueryTimeRecorder]( + tsWriteLock: TsWriteLock[F], + projectSparqlClient: ProjectSparqlClient[F] ): F[(consumers.EventHandler[F], SubscriptionMechanism[F])] = for { subscriptionMechanism <- SubscriptionMechanism( @@ -42,6 +47,6 @@ object SubscriptionFactory { defaultSubscriptionPayloadComposerFactory(Microservice.ServicePort, Microservice.Identifier) ) _ <- ReProvisioningStatus[F].registerForNotification(subscriptionMechanism) - handler <- EventHandler[F](subscriptionMechanism, tsWriteLock) + handler <- EventHandler[F](subscriptionMechanism, tsWriteLock, projectSparqlClient) } yield handler -> subscriptionMechanism } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinder.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinder.scala index 1a501c5422..9e7f3968a4 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinder.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinder.scala @@ -25,6 +25,7 @@ import io.renku.graph.model.Schemas.schema import io.renku.graph.model.entities.Person import io.renku.graph.model.persons.{GitLabId, ResourceId} import io.renku.graph.model.{GraphClass, persons} +import io.renku.triplesgenerator.gitlab.GitLabProjectMember import io.renku.triplesstore.ResultsDecoder._ import io.renku.triplesstore.SparqlQuery.Prefixes import io.renku.triplesstore._ @@ -69,6 +70,11 @@ private class KGPersonFinderImpl[F[_]: Async: Logger: SparqlQueryTimeRecorder]( } private object KGPersonFinder { + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]( + projectsConnectionConfig: ProjectsConnectionConfig + ): KGPersonFinder[F] = + new KGPersonFinderImpl(projectsConnectionConfig) + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[KGPersonFinder[F]] = ProjectsConnectionConfig[F]().map(new KGPersonFinderImpl(_)) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGProjectMembersFinder.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGProjectMembersFinder.scala index 00f6dbb369..a84c1fef27 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGProjectMembersFinder.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGProjectMembersFinder.scala @@ -75,6 +75,14 @@ private class KGProjectMembersFinderImpl[F[_]: Async: Logger: SparqlQueryTimeRec } private object KGProjectMembersFinder { + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]( + connectionConfig: ProjectsConnectionConfig, + renkuUrl: RenkuUrl + ): KGProjectMembersFinder[F] = { + implicit val url: RenkuUrl = renkuUrl + new KGProjectMembersFinderImpl[F](connectionConfig) + } + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[KGProjectMembersFinder[F]] = for { connectionConfig <- ProjectsConnectionConfig[F]() implicit0(renkuUrl: RenkuUrl) <- RenkuUrlLoader[F]() diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizer.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizer.scala index 223af2da22..66c20ecc04 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizer.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizer.scala @@ -20,27 +20,36 @@ package io.renku.triplesgenerator.events.consumers.membersync package namedgraphs import cats.MonadThrow -import cats.effect.Async +import cats.effect._ import cats.syntax.all._ -import io.renku.graph.model.projects +import io.renku.graph.config.RenkuUrlLoader +import io.renku.graph.model.{RenkuUrl, projects} +import io.renku.triplesgenerator.events.consumers.{ProjectAuthSync, ProjectSparqlClient} +import io.renku.triplesgenerator.gitlab.GitLabProjectMember import io.renku.triplesstore._ import org.typelevel.log4cats.Logger private[membersync] object KGSynchronizer { - def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]: F[KGSynchronizer[F]] = + def apply[F[_]: Async: Logger: SparqlQueryTimeRecorder]( + projectSparqlClient: ProjectSparqlClient[F] + ): F[KGSynchronizer[F]] = for { - kgProjectMembersFinder <- KGProjectMembersFinder[F] - kgPersonFinder <- KGPersonFinder[F] - updatesCreator <- UpdatesCreator[F] - connectionConfig <- ProjectsConnectionConfig[F]() - tsClient <- TSClient[F](connectionConfig).pure[F] - } yield new KGSynchronizerImpl[F](kgProjectMembersFinder, kgPersonFinder, updatesCreator, tsClient) + implicit0(renkuUrl: RenkuUrl) <- RenkuUrlLoader[F]() + projectConnectionCfg <- ProjectsConnectionConfig[F]() + kgProjectMembersFinder = KGProjectMembersFinder[F](projectConnectionCfg, renkuUrl) + kgPersonFinder = KGPersonFinder[F](projectConnectionCfg) + updatesCreator <- UpdatesCreator[F] + tsClient = TSClient[F](projectConnectionCfg) + projectAuthSync = ProjectAuthSync[F](projectSparqlClient) + } yield new KGSynchronizerImpl[F](kgProjectMembersFinder, kgPersonFinder, updatesCreator, projectAuthSync, tsClient) } -private class KGSynchronizerImpl[F[_]: MonadThrow](kgMembersFinder: KGProjectMembersFinder[F], - kgPersonFinder: KGPersonFinder[F], - updatesCreator: UpdatesCreator, - tsClient: TSClient[F] +private class KGSynchronizerImpl[F[_]: MonadThrow]( + kgMembersFinder: KGProjectMembersFinder[F], + kgPersonFinder: KGPersonFinder[F], + updatesCreator: UpdatesCreator, + projectAuthSync: ProjectAuthSync[F], + tsClient: TSClient[F] ) extends KGSynchronizer[F] { import KGSynchronizerFunctions._ @@ -52,5 +61,7 @@ private class KGSynchronizerImpl[F[_]: MonadThrow](kgMembersFinder: KGProjectMem membersToRemove = findMembersToRemove(membersInGL, membersInKG) removalUpdates = updatesCreator.removal(slug, membersToRemove) _ <- (insertionUpdates ::: removalUpdates).map(tsClient.updateWithNoResult).sequence + + _ <- projectAuthSync.syncProject(slug, membersInGL.map(_.toProjectAuthMember)) } yield SyncSummary(membersAdded = membersToAdd.size, membersRemoved = membersToRemove.size) } diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreator.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreator.scala index 4b29af89bd..db9d6ce5dc 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreator.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreator.scala @@ -29,6 +29,7 @@ import io.renku.graph.model.entities.Person import io.renku.graph.model.persons.GitLabId import io.renku.graph.model.views.RdfResource import io.renku.graph.model.views.SparqlLiteralEncoder.sparqlEncode +import io.renku.triplesgenerator.gitlab.GitLabProjectMember import io.renku.triplesstore.SparqlQuery import io.renku.triplesstore.SparqlQuery.Prefixes diff --git a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinder.scala b/triples-generator/src/main/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinder.scala similarity index 86% rename from triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinder.scala rename to triples-generator/src/main/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinder.scala index 39d6c9e07f..caae578b8e 100644 --- a/triples-generator/src/main/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinder.scala +++ b/triples-generator/src/main/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinder.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.triplesgenerator.events.consumers.membersync +package io.renku.triplesgenerator.gitlab import cats.effect.Async import cats.syntax.all._ @@ -27,15 +27,16 @@ import io.circe.Decoder import io.renku.graph.model.persons.{GitLabId, Name} import io.renku.graph.model.projects.Slug import io.renku.http.client.{AccessToken, GitLabClient} +import io.renku.projectauth.ProjectMember import io.renku.tinytypes.json.TinyTypeDecoders._ import org.http4s.Status.{Forbidden, NotFound, Ok, Unauthorized} import org.http4s._ -import org.http4s.circe.jsonOf +import org.http4s.circe.CirceEntityDecoder._ import org.http4s.implicits.http4sLiteralsSyntax import org.typelevel.ci._ import org.typelevel.log4cats.Logger -private trait GitLabProjectMembersFinder[F[_]] { +trait GitLabProjectMembersFinder[F[_]] { def findProjectMembers(slug: Slug)(implicit maybeAccessToken: Option[AccessToken]): F[Set[GitLabProjectMember]] } @@ -90,21 +91,16 @@ private class GitLabProjectMembersFinderImpl[F[_]: Async: GitLabClient: Logger] private def maybeNextPage(response: Response[F]): Option[Int] = response.headers.get(ci"X-Next-Page").flatMap(_.head.value.toIntOption) - private implicit lazy val projectDecoder: EntityDecoder[F, List[GitLabProjectMember]] = { - import io.renku.graph.model.persons - - implicit val decoder: Decoder[GitLabProjectMember] = { cursor => - (cursor.downField("id").as[GitLabId] -> cursor.downField("name").as[persons.Name]) - .mapN(GitLabProjectMember) - } - - jsonOf[F, List[GitLabProjectMember]] - } + private implicit val memberDecoder: Decoder[GitLabProjectMember] = + Decoder.forProduct3("id", "name", "access_level")(GitLabProjectMember.apply) } -private object GitLabProjectMembersFinder { +object GitLabProjectMembersFinder { def apply[F[_]: Async: GitLabClient: Logger]: F[GitLabProjectMembersFinder[F]] = new GitLabProjectMembersFinderImpl[F].pure[F].widen[GitLabProjectMembersFinder[F]] } -private final case class GitLabProjectMember(gitLabId: GitLabId, name: Name) +final case class GitLabProjectMember(gitLabId: GitLabId, name: Name, accessLevel: Int) { + def toProjectAuthMember: ProjectMember = + ProjectMember.fromGitLabData(gitLabId, accessLevel) +} diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/DatasetProvision.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/DatasetProvision.scala new file mode 100644 index 0000000000..71ca86691d --- /dev/null +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/DatasetProvision.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator + +import cats.effect.IO +import io.renku.entities.searchgraphs.SearchInfoDatasets +import io.renku.graph.model.entities.EntityFunctions +import io.renku.graph.model.projects.Role +import io.renku.graph.model.{RenkuUrl, entities} +import io.renku.projectauth.ProjectMember +import io.renku.triplesgenerator.events.consumers.{ProjectAuthSync, ProjectSparqlClient} +import io.renku.triplesstore.{GraphsProducer, InMemoryJena, ProjectsDataset} + +trait DatasetProvision extends SearchInfoDatasets { self: ProjectsDataset with InMemoryJena => + + override def provisionProject( + project: entities.Project + )(implicit + entityFunctions: EntityFunctions[entities.Project], + graphsProducer: GraphsProducer[entities.Project], + renkuUrl: RenkuUrl + ): IO[Unit] = { + val ps = ProjectSparqlClient[IO](projectsDSConnectionInfo).map(ProjectAuthSync[IO](_)) + val members = project.members.flatMap(p => p.maybeGitLabId.map(id => ProjectMember(id, Role.Reader))) + super.provisionProject(project) *> ps.use(_.syncProject(project.slug, members)) + } +} diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/Generators.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/Generators.scala index 56218a7a2a..b795d66737 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/Generators.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/Generators.scala @@ -21,17 +21,12 @@ package io.renku.triplesgenerator.events.consumers.membersync import cats.syntax.all._ import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators.positiveInts -import io.renku.graph.model.GraphModelGenerators.{personGitLabIds, personNames, personResourceIds} +import io.renku.graph.model.GraphModelGenerators.{personGitLabIds, personResourceIds} import io.renku.graph.model.RenkuUrl import org.scalacheck.Gen private object Generators { - implicit val gitLabProjectMembers: Gen[GitLabProjectMember] = for { - id <- personGitLabIds - name <- personNames - } yield GitLabProjectMember(id, name) - implicit def kgProjectMembers(implicit renkuUrl: RenkuUrl): Gen[KGProjectMember] = for { memberId <- personResourceIds gitLabId <- personGitLabIds diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizerSpec.scala index 13a8e6c085..d4d5d94887 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/MembersSynchronizerSpec.scala @@ -33,6 +33,8 @@ import io.renku.interpreters.TestLogger.Level.{Error, Info} import io.renku.logging.TestExecutionTimeRecorder import io.renku.testtools.IOSpec import io.renku.triplesgenerator.events.consumers.membersync.Generators._ +import io.renku.triplesgenerator.gitlab.Generators._ +import io.renku.triplesgenerator.gitlab.{GitLabProjectMember, GitLabProjectMembersFinder} import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinderSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinderSpec.scala index 7be6eb84fa..3d67050501 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinderSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGPersonFinderSpec.scala @@ -19,7 +19,6 @@ package io.renku.triplesgenerator.events.consumers.membersync package namedgraphs -import Generators.gitLabProjectMembers import cats.effect.IO import cats.syntax.all._ import io.renku.generators.Generators.Implicits._ @@ -29,6 +28,7 @@ import io.renku.interpreters.TestLogger import io.renku.logging.TestSparqlQueryTimeRecorder import io.renku.testtools.IOSpec import io.renku.triplesstore.{InMemoryJenaForSpec, ProjectsDataset, SparqlQueryTimeRecorder} +import io.renku.triplesgenerator.gitlab.Generators._ import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpec import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizerSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizerSpec.scala index 77e6a5518a..4c3785d7aa 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizerSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/KGSynchronizerSpec.scala @@ -26,6 +26,10 @@ import io.renku.generators.Generators.Implicits._ import io.renku.generators.Generators._ import io.renku.graph.model.GraphModelGenerators._ import io.renku.graph.model.{RenkuUrl, projects} +import io.renku.projectauth.ProjectMember +import io.renku.triplesgenerator.events.consumers.ProjectAuthSync +import io.renku.triplesgenerator.gitlab.GitLabProjectMember +import io.renku.triplesgenerator.gitlab.Generators._ import io.renku.triplesstore.TSClient import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should @@ -75,6 +79,11 @@ class KGSynchronizerSpec extends AnyWordSpec with MockFactory with should.Matche .returning(().pure[Try]) } + (projectAuthSync + .syncProject(_: projects.Slug, _: Set[ProjectMember])) + .expects(projectSlug, membersInGitLab.map(_.toProjectAuthMember)) + .returning(().pure[Try]) + synchronizer.syncMembers(projectSlug, membersInGitLab) shouldBe SyncSummary(missingMembersWithIds.size, membersToRemove.size).pure[Try] } @@ -98,7 +107,9 @@ class KGSynchronizerSpec extends AnyWordSpec with MockFactory with should.Matche val kgPersonFinder = mock[KGPersonFinder[Try]] val updatesCreator = mock[UpdatesCreator] val tsClient = mock[TSClient[Try]] + val projectAuthSync = mock[ProjectAuthSync[Try]] - val synchronizer = new KGSynchronizerImpl[Try](kgProjectMembersFinder, kgPersonFinder, updatesCreator, tsClient) + val synchronizer = + new KGSynchronizerImpl[Try](kgProjectMembersFinder, kgPersonFinder, updatesCreator, projectAuthSync, tsClient) } } diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreatorSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreatorSpec.scala index 6a0d012b47..4de9ae3c15 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreatorSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/namedgraphs/UpdatesCreatorSpec.scala @@ -32,6 +32,7 @@ import io.renku.graph.model.views.RdfResource import io.renku.graph.model.{GraphClass, persons, projects} import io.renku.testtools.IOSpec import io.renku.triplesgenerator.events.consumers.membersync.PersonOps._ +import io.renku.triplesgenerator.gitlab.Generators._ import io.renku.triplesstore.SparqlQuery.Prefixes import io.renku.triplesstore.{InMemoryJenaForSpec, ProjectsDataset, SparqlQuery} import org.scalatest.matchers.should diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/Generators.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/Generators.scala new file mode 100644 index 0000000000..d26adc7228 --- /dev/null +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/Generators.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesgenerator.gitlab + +import io.renku.graph.model.GraphModelGenerators.{personGitLabIds, personNames} +import io.renku.graph.model.RenkuTinyTypeGenerators +import io.renku.graph.model.projects.Role +import org.scalacheck.Gen + +object Generators { + + implicit val gitLabProjectMembers: Gen[GitLabProjectMember] = for { + id <- personGitLabIds + name <- personNames + role <- RenkuTinyTypeGenerators.roleGen + } yield GitLabProjectMember(id, name, Role.toGitLabAccessLevel(role)) + +} diff --git a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinderSpec.scala b/triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinderSpec.scala similarity index 97% rename from triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinderSpec.scala rename to triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinderSpec.scala index 74954be9e2..a4adf598c0 100644 --- a/triples-generator/src/test/scala/io/renku/triplesgenerator/events/consumers/membersync/GitLabProjectMembersFinderSpec.scala +++ b/triples-generator/src/test/scala/io/renku/triplesgenerator/gitlab/GitLabProjectMembersFinderSpec.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.triplesgenerator.events.consumers.membersync +package io.renku.triplesgenerator.gitlab /* * Copyright 2021 Swiss Data Science Center (SDSC) @@ -55,7 +55,7 @@ import io.renku.http.tinytypes.TinyTypeURIEncoder._ import io.renku.interpreters.TestLogger import io.renku.stubbing.ExternalServiceStubbing import io.renku.testtools.{GitLabClientTools, IOSpec} -import io.renku.triplesgenerator.events.consumers.membersync.Generators._ +import io.renku.triplesgenerator.gitlab.Generators._ import org.http4s.Status.{Forbidden, Unauthorized} import org.http4s.implicits.http4sLiteralsSyntax import org.http4s.{Header, Headers, Request, Response, Status, Uri} @@ -195,7 +195,8 @@ class GitLabProjectMembersFinderSpec json"""{ "id": ${member.gitLabId.value}, "username": ${member.name.value}, - "name": ${member.name.value} + "name": ${member.name.value}, + "access_level": ${member.accessLevel} }""" } } diff --git a/triples-store-client/build.sbt b/triples-store-client/build.sbt index ecd1fd9b62..1b2add3fd5 100644 --- a/triples-store-client/build.sbt +++ b/triples-store-client/build.sbt @@ -22,9 +22,13 @@ name := "triples-store-client" libraryDependencies ++= Dependencies.jsonld4s ++ Dependencies.luceneQueryParser ++ - Dependencies.rdf4jQueryParserSparql + Dependencies.rdf4jQueryParserSparql ++ + Dependencies.http4sClient ++ + Dependencies.http4sCirce libraryDependencies ++= (Dependencies.scalacheck ++ Dependencies.scalatest ++ + Dependencies.catsEffectScalaTest ++ + Dependencies.testContainersScalaTest ++ Dependencies.scalatestScalaCheck).map(_ % Test) diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionConfig.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionConfig.scala new file mode 100644 index 0000000000..da3ac632f5 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionConfig.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import org.http4s.{BasicCredentials, Uri} + +final case class ConnectionConfig( + baseUrl: Uri, + basicAuth: Option[BasicCredentials], + retry: Option[Retry.RetryConfig] +) diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionError.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionError.scala new file mode 100644 index 0000000000..1f72f5fb1a --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/ConnectionError.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import org.http4s.client.ConnectionFailure +import org.http4s.ember.core.EmberException + +import java.io.IOException +import java.net.{ConnectException, SocketException, UnknownHostException} +import java.nio.channels.ClosedChannelException + +object ConnectionError { + + def exists(ex: Throwable): Boolean = + unapply(ex).isDefined + + def unapply(ex: Throwable): Option[Throwable] = + ex match { + case _: ConnectionFailure | _: ConnectException | _: SocketException | _: UnknownHostException => + Some(ex) + case _: IOException + if ex.getMessage.toLowerCase + .contains("connection reset") || ex.getMessage.toLowerCase.contains("broken pipe") => + Some(ex) + case _: EmberException.ReachedEndOfStream => Some(ex) + case _: ClosedChannelException => Some(ex) + case _ => None + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultFusekiClient.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultFusekiClient.scala new file mode 100644 index 0000000000..a5638e5d2f --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultFusekiClient.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect._ +import cats.syntax.all._ +import org.http4s.Method.{DELETE, POST} +import org.http4s.client.Client +import org.http4s.client.dsl.Http4sClientDsl +import org.http4s.headers.Accept +import org.http4s.{MediaType, UrlForm} +import org.typelevel.log4cats.Logger + +final class DefaultFusekiClient[F[_]: Async: Logger]( + client: Client[F], + cc: ConnectionConfig +) extends FusekiClient[F] + with Http4sClientDsl[F] + with MoreClientDsl[F] { + + private[this] val retry = cc.retry.map(Retry.apply[F]) + private[this] val datasetsUri = cc.baseUrl / "$" / "datasets" + + override def sparql(datasetName: String): SparqlClient[F] = { + val subConfig = cc.copy(baseUrl = cc.baseUrl / datasetName) + new DefaultSparqlClient[F](client, subConfig) + } + + override def createDataset(name: String, persistent: Boolean): F[Unit] = + retry.fold(createDataset0(name, persistent))(_.retryConnectionError(createDataset0(name, persistent))) + + private def createDataset0(name: String, persistent: Boolean): F[Unit] = { + val dbType = if (persistent) "tdb" else "mem" + val req = + POST(datasetsUri) + .putHeaders(Accept(MediaType.application.json)) + .withBasicAuth(cc.basicAuth) + .withEntity(UrlForm("dbType" -> dbType, "dbName" -> name)) + + client.run(req).use { resp => + if (resp.status.isSuccess) ().pure[F] + else SparqlRequestError(s"createDataset($name)", resp).flatMap(Async[F].raiseError) + } + } + + override def deleteDataset(name: String): F[Unit] = + retry.fold(deleteDataset0(name))(_.retryConnectionError(deleteDataset0(name))) + + private def deleteDataset0(name: String): F[Unit] = { + val req = DELETE(datasetsUri / name).withBasicAuth(cc.basicAuth) + client.run(req).use { resp => + if (resp.status.isSuccess) ().pure[F] + else SparqlRequestError(s"deleteDataset($name)", resp).flatMap(Async[F].raiseError) + } + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultSparqlClient.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultSparqlClient.scala new file mode 100644 index 0000000000..f9eb198703 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/DefaultSparqlClient.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect._ +import cats.syntax.all._ +import io.circe.Json +import io.renku.jsonld.JsonLD +import org.http4s.MediaType +import org.http4s.Method.POST +import org.http4s.circe.CirceEntityCodec._ +import org.http4s.client.Client +import org.http4s.client.dsl.Http4sClientDsl +import org.http4s.headers.{Accept, `Content-Type`} +import org.http4s.implicits._ +import org.typelevel.log4cats.Logger + +final class DefaultSparqlClient[F[_]: Async: Logger](client: Client[F], config: ConnectionConfig) + extends SparqlClient[F] + with Http4sClientDsl[F] + with MoreClientDsl[F] { + + private[this] val retry = config.retry.map(Retry.apply[F]) + private[this] val sparqlResultsJson: MediaType = mediaType"application/sparql-results+json" + + override def update(request: SparqlUpdate): F[Unit] = + retry.fold(update0(request))(_.retryConnectionError(update0(request))) + + private def update0(request: SparqlUpdate): F[Unit] = { + val req = + POST(config.baseUrl / "update") + .putHeaders(Accept(sparqlResultsJson)) + .withBasicAuth(config.basicAuth) + .withEntity(request) + + client.run(req).use { resp => + if (resp.status.isSuccess) ().pure[F] + else SparqlRequestError(request.render, resp).flatMap(Async[F].raiseError) + } + } + + override def upload(data: JsonLD): F[Unit] = + retry.fold(upload0(data))(_.retryConnectionError(upload0(data))) + + private def upload0(data: JsonLD): F[Unit] = { + val req = + POST(config.baseUrl / "data") + .putHeaders(Accept(sparqlResultsJson)) + .withBasicAuth(config.basicAuth) + .withEntity(data.toJson) + .withContentType(`Content-Type`(MediaType.application.`ld+json`)) + + client.run(req).use { resp => + if (resp.status.isSuccess) ().pure[F] + else SparqlRequestError(data.toJson.noSpaces, resp).flatMap(Async[F].raiseError) + } + } + + override def query(request: SparqlQuery): F[Json] = + retry.fold(query0(request))(_.retryConnectionError(query0(request))) + + private def query0(request: SparqlQuery): F[Json] = { + val req = + POST(config.baseUrl / "query") + .addHeader(Accept(sparqlResultsJson)) + .withBasicAuth(config.basicAuth) + .withEntity(request) + + client.run(req).use { resp => + if (resp.status.isSuccess) resp.as[Json] + else SparqlRequestError(request.render, resp).flatMap(Async[F].raiseError) + } + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/FusekiClient.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/FusekiClient.scala new file mode 100644 index 0000000000..1493f81963 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/FusekiClient.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect._ +import fs2.io.net.Network +import org.http4s.ember.client.EmberClientBuilder +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +/** A SPARQL client with additional features for Apache Jena Fuseki servers. */ +trait FusekiClient[F[_]] { + + def createDataset(name: String, persistent: Boolean): F[Unit] + + def deleteDataset(name: String): F[Unit] + + def sparql(datasetName: String): SparqlClient[F] +} + +object FusekiClient { + def apply[F[_]: Async: Network: Logger]( + connectionConfig: ConnectionConfig, + timeout: Duration = 20.minutes + ): Resource[F, FusekiClient[F]] = + EmberClientBuilder + .default[F] + .withTimeout(timeout) + .build + .map { c => + new DefaultFusekiClient[F](c, connectionConfig) + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/MoreClientDsl.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/MoreClientDsl.scala new file mode 100644 index 0000000000..9fb21081b5 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/MoreClientDsl.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import org.http4s.{BasicCredentials, Request} +import org.http4s.headers.Authorization + +trait MoreClientDsl[F[_]] { + + final implicit class MoreRequestDsl(req: Request[F]) { + def withBasicAuth(cred: Option[BasicCredentials]): Request[F] = + cred.map(c => req.putHeaders(Authorization(c))).getOrElse(req) + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/Retry.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/Retry.scala new file mode 100644 index 0000000000..1ca08f0ae6 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/Retry.scala @@ -0,0 +1,104 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.MonadThrow +import cats.effect._ +import cats.kernel.Monoid +import cats.syntax.all._ +import fs2.Stream +import io.renku.triplesstore.client.http.Retry.Context +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +final class Retry[F[_]: Logger: Temporal: MonadThrow](cfg: Retry.RetryConfig) { + private[this] val logger: Logger[F] = Logger[F] + private[this] val F = MonadThrow[F] + + def retryWhen[A](filter: Throwable => Boolean)(fa: F[A]): F[A] = { + val waits = Stream.awakeDelay(cfg.interval).void + + val tries = + (Stream.eval(fa.attempt) ++ + waits + .zip(Stream.repeatEval(fa.attempt)) + .map(_._2)).zipWithIndex.take(cfg.maxRetries) + + val result = + tries + .flatMap { + case (Right(v), _) => Stream.emit(Context.success(v)) + case (Left(ex), currentTry) if filter(ex) => + Stream + .eval(logger.info(s"Failing with ${ex.getMessage}, trying again $currentTry/${cfg.maxRetries}")) + .as(Context.failed[A](ex)) + + case (Left(ex), _) => + Stream.raiseError(ex) + } + .takeThrough(_.valueAbsent) + .compile + .foldMonoid + + result.map(_.toEither).flatMap { + case Right(v) => v.pure[F] + case Left(errs) => F.raiseError(Retry.RetryExceeded(cfg, errs)) + } + } + + def retryConnectionError[A](fa: F[A]): F[A] = + retryWhen(ConnectionError.exists)(fa) +} + +object Retry { + final case class RetryConfig( + interval: FiniteDuration, + maxRetries: Int + ) + + object RetryConfig { + val default: RetryConfig = RetryConfig(10.seconds, 10) + } + + final case class RetryExceeded(cfg: RetryConfig, errors: List[Throwable]) + extends RuntimeException( + s"Fail after trying ${cfg.maxRetries} times at ${cfg.interval} interval", + errors.headOption.orNull + ) { + override def fillInStackTrace() = this + } + + def apply[F[_]: Logger: Temporal: MonadThrow](cfg: RetryConfig): Retry[F] = + new Retry[F](cfg) + + private final case class Context[A](value: Option[A], errors: List[Throwable]) { + def valueAbsent: Boolean = value.isEmpty + def toEither: Either[List[Throwable], A] = value.toRight(errors) + private def merge(c: Context[A]): Context[A] = + Context(value.orElse(c.value), c.errors ::: errors) + } + private object Context { + def success[A](v: A): Context[A] = Context(v.some, Nil) + def failed[A](ex: Throwable): Context[A] = Context(None, List(ex)) + + implicit def monoid[A]: Monoid[Context[A]] = + Monoid.instance[Context[A]](Context(None, Nil), _ merge _) + } +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/RowDecoder.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/RowDecoder.scala new file mode 100644 index 0000000000..e109038110 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/RowDecoder.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.syntax.all._ +import io.circe.{Decoder, HCursor} + +trait RowDecoder[A] extends Decoder[A] + +object RowDecoder { + + def apply[A](implicit d: RowDecoder[A]): RowDecoder[A] = d + + def fromDecoder[A](d: Decoder[A]): RowDecoder[A] = + (c: HCursor) => d.apply(c) + + private def prop[A: Decoder](name: String): Decoder[A] = + Decoder.instance { cursor => + cursor.downField(name).downField("value").as[A] + } + + def forProduct1[T, A0](name: String)(f: A0 => T)(implicit d: Decoder[A0]): RowDecoder[T] = + fromDecoder(prop(name).map(f)) + + def forProduct2[T, A0: Decoder, A1: Decoder](name0: String, name1: String)(f: (A0, A1) => T): RowDecoder[T] = + fromDecoder((prop[A0](name0), prop[A1](name1)).mapN(f)) + + def forProduct3[T, A0: Decoder, A1: Decoder, A2: Decoder](name0: String, name1: String, name2: String)( + f: (A0, A1, A2) => T + ): RowDecoder[T] = + fromDecoder((prop[A0](name0), prop[A1](name1), prop[A2](name2)).mapN(f)) + + def forProduct4[T, A0: Decoder, A1: Decoder, A2: Decoder, A3: Decoder]( + name0: String, + name1: String, + name2: String, + name3: String + )( + f: (A0, A1, A2, A3) => T + ): RowDecoder[T] = + fromDecoder((prop[A0](name0), prop[A1](name1), prop[A2](name2), prop[A3](name3)).mapN(f)) + +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlClient.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlClient.scala new file mode 100644 index 0000000000..cba2baa6ac --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlClient.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.MonadThrow +import cats.effect.{Async, Resource} +import fs2.io.net.Network +import io.circe.{Decoder, Json} +import io.renku.jsonld.JsonLD +import org.http4s.ember.client.EmberClientBuilder +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +trait SparqlClient[F[_]] { + + /** The sparql update operation. */ + def update(request: SparqlUpdate): F[Unit] + + /** Upload rdf data. Not an official sparql operation, but Jena supports it. */ + def upload(data: JsonLD): F[Unit] + + /** The sparql query operation, returning results as JSON. */ + def query(request: SparqlQuery): F[Json] + + def queryDecode[A](request: SparqlQuery)(implicit d: RowDecoder[A], F: MonadThrow[F]): F[List[A]] = { + val decoder = Decoder + .instance(c => c.downField("results").downField("bindings").as[List[A]]) + .withErrorMessage(s"Decoding Sparql result failed for request: $request") + F.flatMap(query(request))(json => decoder.decodeJson(json).fold(F.raiseError, F.pure)) + } +} + +object SparqlClient { + def apply[F[_]: Async: Network: Logger]( + connectionConfig: ConnectionConfig, + timeout: Duration = 20.minutes + ): Resource[F, SparqlClient[F]] = + EmberClientBuilder + .default[F] + .withTimeout(timeout) + .build + .map(c => new DefaultSparqlClient[F](c, connectionConfig)) +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlQuery.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlQuery.scala new file mode 100644 index 0000000000..b6a685b5ab --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlQuery.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import org.http4s.headers.`Content-Type` +import org.http4s.{EntityEncoder, MediaType} + +trait SparqlQuery { + def render: String +} + +object SparqlQuery { + final case class Raw(render: String) extends SparqlQuery + + def raw(sparql: String): SparqlQuery = Raw(sparql) + + implicit def entityEncoder[F[_]]: EntityEncoder[F, SparqlQuery] = + EntityEncoder + .stringEncoder[F] + .contramap[SparqlQuery](_.render) + .withContentType(`Content-Type`(MediaType.application.`sparql-query`)) +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlRequestError.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlRequestError.scala new file mode 100644 index 0000000000..1fe7b80f6c --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlRequestError.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect.Concurrent +import cats.syntax.all._ +import org.http4s.{EntityDecoder, Response, Status} + +final case class SparqlRequestError(req: String, status: Status, body: String) + extends RuntimeException(s"Sparql request '$req' failed with status=$status: $body") { + override def fillInStackTrace(): Throwable = this +} + +object SparqlRequestError { + def apply[F[_]: Concurrent](req: String, resp: Response[F]): F[SparqlRequestError] = + EntityDecoder.decodeText(resp).map(str => SparqlRequestError(req, resp.status, str)) +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlUpdate.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlUpdate.scala new file mode 100644 index 0000000000..d0adbe8718 --- /dev/null +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/http/SparqlUpdate.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import org.http4s.EntityEncoder +import org.http4s.headers.`Content-Type` +import org.http4s.implicits._ + +trait SparqlUpdate { + def render: String +} + +object SparqlUpdate { + final case class Raw(render: String) extends SparqlUpdate + + def raw(sparql: String): SparqlUpdate = Raw(sparql) + + implicit def entityEncoder[F[_]]: EntityEncoder[F, SparqlUpdate] = + EntityEncoder + .stringEncoder[F] + .contramap[SparqlUpdate](_.render) + .withContentType(`Content-Type`(mediaType"application/sparql-update")) +} diff --git a/triples-store-client/src/main/scala/io/renku/triplesstore/client/sparql/Fragment.scala b/triples-store-client/src/main/scala/io/renku/triplesstore/client/sparql/Fragment.scala index 0388d7fd30..b68473709d 100644 --- a/triples-store-client/src/main/scala/io/renku/triplesstore/client/sparql/Fragment.scala +++ b/triples-store-client/src/main/scala/io/renku/triplesstore/client/sparql/Fragment.scala @@ -19,8 +19,9 @@ package io.renku.triplesstore.client.sparql import cats.{Monoid, Show} +import io.renku.triplesstore.client.http.{SparqlQuery, SparqlUpdate} -final case class Fragment(sparql: String) { +final case class Fragment(sparql: String) extends SparqlUpdate with SparqlQuery { def isEmpty: Boolean = sparql.isBlank def nonEmpty: Boolean = !isEmpty @@ -33,6 +34,8 @@ final case class Fragment(sparql: String) { def stripMargin: Fragment = Fragment(sparql.stripMargin) + + override def render = sparql } object Fragment { diff --git a/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/RetrySpec.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/RetrySpec.scala new file mode 100644 index 0000000000..3577d934a3 --- /dev/null +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/RetrySpec.scala @@ -0,0 +1,147 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect._ +import cats.effect.testing.scalatest.AsyncIOSpec +import cats.syntax.all._ +import org.scalatest.Assertion +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.duration._ + +class RetrySpec extends AsyncFlatSpec with AsyncIOSpec with should.Matchers { + + implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + case object RetryError extends RuntimeException("retry error") { + def unapply(ex: Throwable) = + ex match { + case RetryError => Some(this) + case _ => None + } + } + case object FinalError extends RuntimeException("final error") + + final class TestEffect(f: Int => Option[Throwable]) { + val execTimes: Ref[IO, List[FiniteDuration]] = Ref.unsafe(Nil) + + def exec: IO[Unit] = + Clock[IO].monotonic.flatMap { time => + execTimes.flatModify { list => + f(list.size) match { + case None => (time :: list, IO.unit) + case Some(r) => (time :: list, IO.raiseError(r)) + } + } + } + + def assertPauseTime(expect: FiniteDuration, epsilon: Duration = 50.millis): IO[Assertion] = + execTimes.get.map { times => + val lowerBound = expect - epsilon + val upperBound = expect + epsilon + + val diffs = times + .zip(times.tail) + .map { case (a, b) => a - b } + + all(diffs) should (be > lowerBound and be < upperBound) + } + } + + it should "not retry and not wait on first success" in { + val e = new TestEffect(_ => None) + val retry = new Retry[IO](Retry.RetryConfig(5.hours, 10)) + for { + r <- retry.retryWhen(RetryError.unapply(_).isDefined)(e.exec) + _ = r shouldBe () + execCount <- e.execTimes.get + _ = execCount.size shouldBe 1 + } yield () + } + + it should "not retry for unfiltered errors" in { + val e = new TestEffect(_ => FinalError.some) + val retry = Retry[IO](Retry.RetryConfig(5.hours, 10)) + for { + r <- retry.retryWhen(RetryError.unapply(_).isDefined)(e.exec).attempt + _ = r shouldBe Left(FinalError) + execCount <- e.execTimes.get + _ = execCount.size shouldBe 1 + } yield () + } + + it should "retry in the given interval until success result" in { + val e = new TestEffect( + Map( + 0 -> RetryError.some, + 1 -> RetryError.some + ).withDefaultValue(None) + ) + val retry = Retry[IO](Retry.RetryConfig(500.millis, 10)) + for { + r <- retry.retryWhen(RetryError.unapply(_).isDefined)(e.exec) + _ = r shouldBe () + execCount <- e.execTimes.get + _ = execCount.size shouldBe 3 + _ <- e.assertPauseTime(500.millis, epsilon = 100.millis) + } yield () + } + + it should "retry in the given interval until final error" in { + val e = new TestEffect( + Map( + 0 -> RetryError.some, + 1 -> RetryError.some, + 2 -> FinalError.some + ).withDefaultValue(None) + ) + val retry = Retry[IO](Retry.RetryConfig(500.millis, 10)) + for { + r <- retry.retryWhen(RetryError.unapply(_).isDefined)(e.exec).attempt + _ = r shouldBe Left(FinalError) + execCount <- e.execTimes.get + _ = execCount.size shouldBe 3 + _ <- e.assertPauseTime(500.millis) + } yield () + } + + it should "retry and give up after max tries" in { + val e = new TestEffect( + Map( + 0 -> RetryError.some, + 1 -> RetryError.some, + 2 -> RetryError.some, + 3 -> RetryError.some + ).withDefaultValue(None) + ) + val retry = Retry[IO](Retry.RetryConfig(1.millis, 3)) + for { + r <- retry.retryWhen(RetryError.unapply(_).isDefined)(e.exec).attempt + Left(ex: Retry.RetryExceeded) = r + _ = ex.getCause shouldBe RetryError + _ = ex.errors.size shouldBe 3 + execCount <- e.execTimes.get + _ = execCount.size shouldBe 3 + } yield () + } +} diff --git a/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/SparqlClientSpec.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/SparqlClientSpec.scala new file mode 100644 index 0000000000..0776d15e45 --- /dev/null +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/http/SparqlClientSpec.scala @@ -0,0 +1,113 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.http + +import cats.effect._ +import cats.effect.testing.scalatest.AsyncIOSpec +import io.renku.jsonld.syntax._ +import io.renku.jsonld.{EntityId, EntityTypes, JsonLD, JsonLDEncoder, Schema} +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import io.renku.triplesstore.client.syntax._ +import io.renku.triplesstore.client.util.JenaContainerSpec + +import java.time.Instant + +class SparqlClientSpec extends AsyncFlatSpec with AsyncIOSpec with JenaContainerSpec with should.Matchers { + implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO] + val dataset = "projects" + + val testQuery = sparql"""PREFIX schema: + |SELECT * WHERE { + | graph { + | ?projId schema:dateModified ?dateModified + | } + |} LIMIT 100""".stripMargin + + it should "run sparql queries" in { + withDataset(dataset).use { c => + for { + _ <- c.update( + sparql"""PREFIX schema: + |PREFIX rdf: + |PREFIX rdfs: + |INSERT DATA { + | Graph { + | + | schema:dateModified "1988-09-21T17:44:42.325Z"^^. + | } + |} + |""".stripMargin + ) + r <- c.query(testQuery) + obj = r.asObject.getOrElse(sys.error(s"Unexpected response: $r")) + _ = obj("head").get.isObject shouldBe true + _ = obj("results").get.isObject shouldBe true + decoded <- c.queryDecode[Data](testQuery) + _ = decoded shouldBe List( + Data( + "https://tygtmzjt:8901/EWxEPoLMmg/projects/123", + Instant.parse("1988-09-21T17:44:42.325Z") + ) + ) + } yield () + } + } + + it should "upload jsonld" in { + val data = Data("http://localhost/project/123", Instant.now()) + withDataset(dataset).use { c => + for { + _ <- c.upload(data.asJsonLD) + r <- c.queryDecode[Data](SparqlQuery.raw("""PREFIX schema: + |SELECT ?projId ?dateModified + |WHERE { + | ?p a schema:Person; + | schema:dateModified ?dateModified; + | schema:project ?projId. + |} + |""".stripMargin)) + _ = r.contains(data) shouldBe true + } yield () + } + } + + object Schemas { + val renku: Schema = Schema.from("https://swissdatasciencecenter.github.io/renku-ontology", separator = "#") + val schema: Schema = Schema.from("http://schema.org") + } + + case class Data(projId: String, modified: Instant) + object Data { + implicit val rowDecoder: RowDecoder[Data] = + RowDecoder.forProduct2("projId", "dateModified")(Data.apply) + + implicit val jsonLDEncoder: JsonLDEncoder[Data] = + JsonLDEncoder.instance { data => + JsonLD.entity( + EntityId.blank, + EntityTypes.of(Schemas.schema / "Person"), + Schemas.schema / "dateModified" -> data.modified.asJsonLD, + Schemas.schema / "project" -> data.projId.asJsonLD + ) + } + } +} diff --git a/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainer.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainer.scala new file mode 100644 index 0000000000..c96eaee81f --- /dev/null +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainer.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.util + +import com.dimafeng.testcontainers.{FixedHostPortGenericContainer, GenericContainer, SingleContainer} +import eu.timepit.refined.api.Refined +import eu.timepit.refined.auto._ +import eu.timepit.refined.numeric.Positive +import org.http4s.Uri +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.containers +import org.testcontainers.utility.DockerImageName + +object JenaContainer { + val version = "0.0.21" + val imageName = s"renku/renku-jena:$version" + val image = DockerImageName.parse(imageName) + + def create(mode: JenaRunMode): SingleContainer[_] = mode match { + case JenaRunMode.GenericContainer => + GenericContainer( + dockerImage = imageName, + exposedPorts = Seq(3030), + waitStrategy = Wait forHttp "/$/ping" + ) + case JenaRunMode.FixedPortContainer(fixedPort) => + FixedHostPortGenericContainer( + imageName = imageName, + exposedPorts = Seq(3030), + exposedHostPort = fixedPort, + exposedContainerPort = fixedPort, + waitStrategy = Wait forHttp "/$/ping" + ) + case JenaRunMode.Local(_) => + new GenericContainer(new containers.GenericContainer("") { + override def start(): Unit = () + override def stop(): Unit = () + }) + } + + def serverPort(mode: JenaRunMode, cnt: SingleContainer[_]): Int Refined Positive = mode match { + case JenaRunMode.GenericContainer => Refined.unsafeApply(cnt.mappedPort(cnt.exposedPorts.head)) + case JenaRunMode.FixedPortContainer(port) => port + case JenaRunMode.Local(port) => port + } + + def fusekiUrl(mode: JenaRunMode, cnt: SingleContainer[_]): String = s"http://localhost:${serverPort(mode, cnt)}" + def fusekiUri(mode: JenaRunMode, cnt: SingleContainer[_]): Uri = Uri.unsafeFromString(fusekiUrl(mode, cnt)) +} diff --git a/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerDirectSpec.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerDirectSpec.scala new file mode 100644 index 0000000000..5a7fb370c4 --- /dev/null +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerDirectSpec.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.util + +import cats.effect.IO +import cats.effect.unsafe.IORuntime +import io.renku.triplesstore.client.http.{FusekiClient, SparqlClient} +import org.scalatest.{BeforeAndAfterAll, Suite} +import org.typelevel.log4cats.Logger + +/** Trait for having a client directly accessible using "unsafe" effects. */ +trait JenaContainerDirectSpec extends JenaContainerSpec with BeforeAndAfterAll { self: Suite => + implicit def logger: Logger[IO] + + implicit val ioRuntime: IORuntime + + private var tearDown: IO[Unit] = IO.unit + protected var fusekiClient: FusekiClient[IO] = _ + + protected def sparqlClient(datasetName: String): SparqlClient[IO] = fusekiClient.sparql(datasetName) + + override def afterStart(): Unit = { + super.afterStart() + val (client, shutdown) = clientResource.allocated.unsafeRunSync() + this.tearDown = shutdown + this.fusekiClient = client + } + + override def afterAll(): Unit = { + super.afterAll() + tearDown.unsafeRunSync() + } +} diff --git a/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerSpec.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerSpec.scala new file mode 100644 index 0000000000..2fd529915b --- /dev/null +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaContainerSpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2023 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.triplesstore.client.util + +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.syntax.all._ +import com.dimafeng.testcontainers.ForAllTestContainer +import io.renku.triplesstore.client.http.{ConnectionConfig, FusekiClient, SparqlClient} +import org.http4s.{BasicCredentials, Uri} +import org.scalatest.Suite +import org.typelevel.log4cats.Logger + +import scala.concurrent.duration._ + +trait JenaContainerSpec extends ForAllTestContainer { self: Suite => + + protected val runMode: JenaRunMode = JenaRunMode.GenericContainer + protected val timeout: Duration = 2.minutes + + lazy val container = JenaContainer.create(runMode) + + protected lazy val jenaUri: Uri = JenaContainer.fusekiUri(runMode, container) + + def clientResource(implicit L: Logger[IO]): Resource[IO, FusekiClient[IO]] = { + val cc = ConnectionConfig(jenaUri, Some(BasicCredentials("admin", "admin")), None) + FusekiClient[IO](cc, timeout) + } + + def sparqlResource(datasetName: String)(implicit L: Logger[IO]): Resource[IO, SparqlClient[IO]] = + clientResource.map(_.sparql(datasetName)) + + def withDataset(name: String)(implicit L: Logger[IO]): Resource[IO, SparqlClient[IO]] = { + def datasetResource(c: FusekiClient[IO]) = + Resource.make(c.createDataset(name, persistent = false))(_ => c.deleteDataset(name)) + + clientResource.flatMap(c => datasetResource(c).as(c.sparql(name))) + } +} diff --git a/graph-commons/src/test/scala/io/renku/triplesstore/JenaRunMode.scala b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaRunMode.scala similarity index 97% rename from graph-commons/src/test/scala/io/renku/triplesstore/JenaRunMode.scala rename to triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaRunMode.scala index 5e750b40c2..7e063fb148 100644 --- a/graph-commons/src/test/scala/io/renku/triplesstore/JenaRunMode.scala +++ b/triples-store-client/src/test/scala/io/renku/triplesstore/client/util/JenaRunMode.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.triplesstore +package io.renku.triplesstore.client.util import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric.Positive