From 5456ed9a8b5162933e7c8a121ea4d163f89385a7 Mon Sep 17 00:00:00 2001 From: Anton Lamtev Date: Sat, 13 Jun 2020 15:00:07 +0300 Subject: [PATCH] Hotfix for #92 & #93 (#94) * fix bug with statement.fetchSize(...) * use reator java-agent for clear stacktraces * use (Mono|Flux).usingWhen(...) for executing SQL-requests on connections * improve clone detector * log thread name * change postgres configuration --- api/Dockerfile | 2 +- api/build.gradle.kts | 18 ++- .../org/accula/api/AcculaApiApplication.java | 3 + .../org/accula/api/code/CodeLoaderImpl.java | 6 +- .../java/org/accula/api/code/FileFilter.java | 3 +- .../java/org/accula/api/config/DbConfig.java | 6 + .../java/org/accula/api/config/WebConfig.java | 10 +- .../org/accula/api/db/repo/CloneRepoImpl.java | 144 +++++++++-------- .../api/db/repo/CommitSnapshotRepoImpl.java | 107 ++++++------- .../api/db/repo/ConnectionProvidedRepo.java | 52 ++++++ .../api/db/repo/GithubRepoRepoImpl.java | 68 ++++---- .../api/db/repo/GithubUserRepoImpl.java | 64 ++++---- .../accula/api/db/repo/ProjectRepoImpl.java | 150 ++++++++---------- .../org/accula/api/db/repo/PullRepoImpl.java | 111 ++++++------- .../java/org/accula/api/db/repo/Repos.java | 58 ------- .../org/accula/api/db/repo/UserRepoImpl.java | 112 +++++++------ .../api/detector/PrimitiveCloneDetector.java | 10 +- .../api/handlers/GithubWebhookHandler.java | 18 ++- .../accula/api/handlers/ProjectsHandler.java | 4 +- .../api/handlers/util/ProjectUpdater.java | 7 + api/src/main/resources/application.yml | 6 +- api/src/main/resources/log4j.properties | 6 +- .../api/detector/CloneDetectorTest.java | 2 +- docker-compose.yml | 4 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 25 files changed, 482 insertions(+), 491 deletions(-) create mode 100644 api/src/main/java/org/accula/api/db/repo/ConnectionProvidedRepo.java delete mode 100644 api/src/main/java/org/accula/api/db/repo/Repos.java diff --git a/api/Dockerfile b/api/Dockerfile index db725b40..c24f0e31 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -5,4 +5,4 @@ RUN gradle build -x test -x web:build --no-daemon FROM openjdk:14-alpine COPY --from=build /app/api/build/libs/*.jar accula.jar -ENTRYPOINT ["java","-jar","/accula.jar"] +ENTRYPOINT ["java","--enable-preview","-jar","/accula.jar"] diff --git a/api/build.gradle.kts b/api/build.gradle.kts index f57fa443..9dc4e54a 100644 --- a/api/build.gradle.kts +++ b/api/build.gradle.kts @@ -1,10 +1,13 @@ plugins { id("org.springframework.boot") version "2.3.0.RELEASE" id("io.spring.dependency-management") version "1.0.9.RELEASE" + id("net.bytebuddy.byte-buddy-gradle-plugin") version "1.10.11" } version = "1.0-SNAPSHOT" +val byteBuddyPlugin: Configuration by configurations.creating + dependencies { implementation("org.springframework.boot:spring-boot-starter-webflux") testImplementation("org.springframework.boot:spring-boot-starter-test") { @@ -13,7 +16,11 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-security") testImplementation("org.springframework.security:spring-security-test") - testImplementation("io.projectreactor:reactor-test:3.3.6.RELEASE") + implementation("net.bytebuddy:byte-buddy:1.10.11") + implementation("io.projectreactor:reactor-tools") + byteBuddyPlugin(group = "io.projectreactor", name = "reactor-tools", classifier = "original") + + testImplementation("io.projectreactor:reactor-test") implementation("org.springframework.boot:spring-boot-starter-actuator") @@ -25,7 +32,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-data-r2dbc") implementation("io.r2dbc:r2dbc-postgresql:0.8.2.RELEASE") implementation("io.r2dbc:r2dbc-pool:0.8.2.RELEASE") - implementation("io.r2dbc:r2dbc-spi:0.8.1.RELEASE") + implementation("io.r2dbc:r2dbc-spi:0.8.2.RELEASE") implementation("org.slf4j:slf4j-api:2.0.0-alpha1") implementation("org.slf4j:slf4j-log4j12:2.0.0-alpha1") @@ -40,3 +47,10 @@ dependencies { annotationProcessor("org.springframework.boot:spring-boot-configuration-processor") } + +byteBuddy { + transformation(closureOf { + plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin" + setClassPath(byteBuddyPlugin) + }) +} diff --git a/api/src/main/java/org/accula/api/AcculaApiApplication.java b/api/src/main/java/org/accula/api/AcculaApiApplication.java index 0a7b721f..d1e45b74 100644 --- a/api/src/main/java/org/accula/api/AcculaApiApplication.java +++ b/api/src/main/java/org/accula/api/AcculaApiApplication.java @@ -2,10 +2,13 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import reactor.tools.agent.ReactorDebugAgent; @SpringBootApplication +@SuppressWarnings("PMD.UseUtilityClass") public class AcculaApiApplication { public static void main(final String[] args) { + ReactorDebugAgent.init(); SpringApplication.run(AcculaApiApplication.class, args); } } diff --git a/api/src/main/java/org/accula/api/code/CodeLoaderImpl.java b/api/src/main/java/org/accula/api/code/CodeLoaderImpl.java index e7531b1c..67aeca44 100644 --- a/api/src/main/java/org/accula/api/code/CodeLoaderImpl.java +++ b/api/src/main/java/org/accula/api/code/CodeLoaderImpl.java @@ -60,12 +60,11 @@ public Flux getFiles(final CommitSnapshot snapshot) { @Override public Flux getFiles(final CommitSnapshot snapshot, final FileFilter filter) { return getRepository(snapshot) - .publishOn(scheduler) .switchIfEmpty(Mono.error(REPO_NOT_FOUND)) + .publishOn(scheduler) .flatMapMany(repo -> Flux.fromIterable(getObjectLoaders(repo, snapshot.getSha()))) .filter(filenameAndLoader -> filter.test(filenameAndLoader.getT1())) .map(filenameAndLoader -> new FileEntity(snapshot, filenameAndLoader.getT1(), getFileContent(filenameAndLoader.getT2()))) - .switchIfEmpty(Mono.error(CUT_ERROR)) .onErrorResume(e -> e instanceof MissingObjectException, e -> { log.info("Most probably branch with the commit {} has been deleted: {}", snapshot.toString(), e); return Flux.empty(); @@ -75,8 +74,8 @@ public Flux getFiles(final CommitSnapshot snapshot, final FileFilter @Override public Mono getFile(final CommitSnapshot snapshot, final String filename) { return getRepository(snapshot) - .publishOn(scheduler) .switchIfEmpty(Mono.error(REPO_NOT_FOUND)) + .publishOn(scheduler) .flatMap(repo -> Mono.justOrEmpty(getObjectLoader(repo, snapshot.getSha(), filename))) .switchIfEmpty(Mono.error(FILE_NOT_FOUND)) .map(loader -> new FileEntity(snapshot, filename, getFileContent(loader))) @@ -98,6 +97,7 @@ public Mono getFileSnippet(final CommitSnapshot snapshot, final Stri .switchIfEmpty(Mono.error(CUT_ERROR)); } + @Override public Flux> getDiff(final CommitSnapshot base, final CommitSnapshot head) { return getDiff(base, head, FileFilter.ALL); } diff --git a/api/src/main/java/org/accula/api/code/FileFilter.java b/api/src/main/java/org/accula/api/code/FileFilter.java index dade5621..25c4a051 100644 --- a/api/src/main/java/org/accula/api/code/FileFilter.java +++ b/api/src/main/java/org/accula/api/code/FileFilter.java @@ -15,7 +15,8 @@ public interface FileFilter extends Predicate { FileFilter TESTS = file -> file.contains("Test"); FileFilter SRC_JAVA = JAVA.and(TESTS.negate()); - default FileFilter and(FileFilter other) { + @Override + default FileFilter and(Predicate other) { return f -> test(f) && other.test(f); } diff --git a/api/src/main/java/org/accula/api/config/DbConfig.java b/api/src/main/java/org/accula/api/config/DbConfig.java index d0316d21..a1f49c9d 100644 --- a/api/src/main/java/org/accula/api/config/DbConfig.java +++ b/api/src/main/java/org/accula/api/config/DbConfig.java @@ -5,6 +5,7 @@ import io.r2dbc.spi.ConnectionFactories; import io.r2dbc.spi.ConnectionFactoryOptions; import lombok.RequiredArgsConstructor; +import org.accula.api.db.repo.ConnectionProvidedRepo; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -54,4 +55,9 @@ public ConnectionPool connectionFactory() { return new ConnectionPool(poolConfig); } + + @Bean + public ConnectionProvidedRepo.ConnectionProvider connectionProvider(final ConnectionPool connectionPool) { + return connectionPool::create; + } } diff --git a/api/src/main/java/org/accula/api/config/WebConfig.java b/api/src/main/java/org/accula/api/config/WebConfig.java index 7954b9a1..8cbe20bb 100644 --- a/api/src/main/java/org/accula/api/config/WebConfig.java +++ b/api/src/main/java/org/accula/api/config/WebConfig.java @@ -25,7 +25,7 @@ @RequiredArgsConstructor @EnableConfigurationProperties(WebhookProperties.class) public class WebConfig implements WebFluxConfigurer { - private final CurrentUserRepo currentUserRepository; + private final CurrentUserRepo currentUserRepo; @Bean public WebClient webClient() { @@ -34,14 +34,14 @@ public WebClient webClient() { @Bean public GithubClient.AccessTokenProvider githubAccessTokenProvider() { - return () -> currentUserRepository + return () -> currentUserRepo .get() - .flatMap(user -> Mono.justOrEmpty(user.getGithubAccessToken())); + .flatMap(user -> Mono.just(user.getGithubAccessToken())); } @Bean public GithubClient.LoginProvider githubLoginProvider() { - return () -> currentUserRepository + return () -> currentUserRepo .get() .flatMap(user -> Mono.just(user.getGithubUser().getLogin())); } @@ -54,6 +54,6 @@ public RepositoryProvider repositoryManager(@Value("${accula.reposPath}") final @Bean public CloneDetector cloneDetector() { - return new PrimitiveCloneDetector(5, 7); + return new PrimitiveCloneDetector(3, 8); } } diff --git a/api/src/main/java/org/accula/api/db/repo/CloneRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/CloneRepoImpl.java index 02f1eea3..bb71edc6 100644 --- a/api/src/main/java/org/accula/api/db/repo/CloneRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/CloneRepoImpl.java @@ -1,9 +1,9 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Row; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.Clone; import org.springframework.stereotype.Component; @@ -19,8 +19,9 @@ */ @Component @RequiredArgsConstructor -public final class CloneRepoImpl implements CloneRepo { - private final ConnectionPool connectionPool; +public final class CloneRepoImpl implements CloneRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono insert(final Clone clone) { @@ -32,19 +33,18 @@ public Flux insert(final Collection clones) { if (clones.isEmpty()) { return Flux.empty(); } + final var cloneList = clones instanceof ArrayList ? (ArrayList) clones : new ArrayList<>(clones); - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = insertStatement(connection); - cloneList.forEach(clone -> applyInsertBindings(clone, statement).add()); - statement.fetchSize(cloneList.size()); + return manyWithConnection(connection -> { + final var statement = insertStatement(connection); + cloneList.forEach(clone -> applyInsertBindings(clone, statement).add()); - return Repos.convertMany(statement.execute(), connection, row -> Converters - .value(row, "id", Long.class)) - .zipWithIterable(cloneList, (id, clone) -> clone.toBuilder().id(id).build()); - }); + return statement + .execute() + .flatMap(result -> ConnectionProvidedRepo.column(result, "id", Long.class)) + .zipWithIterable(cloneList, (id, clone) -> clone.toBuilder().id(id).build()); + }); } @Override @@ -54,66 +54,64 @@ public Mono findById(final Long id) { @Override public Flux findByTargetCommitSnapshotSha(final String sha) { - return connectionPool - .create() - .flatMapMany(connection -> Mono - .from(connection - .createStatement(""" - SELECT clone.id AS id, - target.sha AS target_sha, - target.branch AS target_branch, - target_snap_to_pull.pull_id AS target_pull_id, - target_repo.id AS target_repo_id, - target_repo.name AS target_repo_name, - target_repo.description AS target_repo_description, - target_repo_owner.id AS target_repo_owner_id, - target_repo_owner.login AS target_repo_owner_login, - target_repo_owner.name AS target_repo_owner_name, - target_repo_owner.avatar AS target_repo_owner_avatar, - target_repo_owner.is_org AS target_repo_owner_is_org, - clone.target_file AS target_file, - clone.target_from_line AS target_from_line, - clone.target_to_line AS target_to_line, - source.sha AS source_sha, - source.branch AS source_branch, - source_snap_to_pull.pull_id AS source_pull_id, - source_repo.id AS source_repo_id, - source_repo.name AS source_repo_name, - source_repo.description AS source_repo_description, - source_repo_owner.id AS source_repo_owner_id, - source_repo_owner.login AS source_repo_owner_login, - source_repo_owner.name AS source_repo_owner_name, - source_repo_owner.avatar AS source_repo_owner_avatar, - source_repo_owner.is_org AS source_repo_owner_is_org, - clone.source_file AS source_file, - clone.source_from_line AS source_from_line, - clone.source_to_line AS source_to_line - FROM clone - JOIN commit_snapshot target - ON clone.target_commit_sha = target.sha - AND clone.target_repo_id = target.repo_id - JOIN repo_github target_repo - ON target.repo_id = target_repo.id - JOIN user_github target_repo_owner - ON target_repo.owner_id = target_repo_owner.id - JOIN commit_snapshot_pull target_snap_to_pull - ON target.sha = target_snap_to_pull.commit_snapshot_sha - AND target.repo_id = target_snap_to_pull.commit_snapshot_repo_id - JOIN commit_snapshot source - ON clone.source_commit_sha = source.sha - AND clone.source_repo_id = source.repo_id - JOIN repo_github source_repo - ON source.repo_id = source_repo.id - JOIN user_github source_repo_owner - ON source_repo.owner_id = source_repo_owner.id - JOIN commit_snapshot_pull source_snap_to_pull - ON source.sha = source_snap_to_pull.commit_snapshot_sha - AND source.repo_id = source_snap_to_pull.commit_snapshot_repo_id - WHERE clone.target_commit_sha = $1 - """) - .bind("$1", sha) - .execute()) - .flatMapMany(result -> Repos.convertMany(result, connection, this::convert))); + return manyWithConnection(connection -> Mono + .from(connection + .createStatement(""" + SELECT clone.id AS id, + target.sha AS target_sha, + target.branch AS target_branch, + target_snap_to_pull.pull_id AS target_pull_id, + target_repo.id AS target_repo_id, + target_repo.name AS target_repo_name, + target_repo.description AS target_repo_description, + target_repo_owner.id AS target_repo_owner_id, + target_repo_owner.login AS target_repo_owner_login, + target_repo_owner.name AS target_repo_owner_name, + target_repo_owner.avatar AS target_repo_owner_avatar, + target_repo_owner.is_org AS target_repo_owner_is_org, + clone.target_file AS target_file, + clone.target_from_line AS target_from_line, + clone.target_to_line AS target_to_line, + source.sha AS source_sha, + source.branch AS source_branch, + source_snap_to_pull.pull_id AS source_pull_id, + source_repo.id AS source_repo_id, + source_repo.name AS source_repo_name, + source_repo.description AS source_repo_description, + source_repo_owner.id AS source_repo_owner_id, + source_repo_owner.login AS source_repo_owner_login, + source_repo_owner.name AS source_repo_owner_name, + source_repo_owner.avatar AS source_repo_owner_avatar, + source_repo_owner.is_org AS source_repo_owner_is_org, + clone.source_file AS source_file, + clone.source_from_line AS source_from_line, + clone.source_to_line AS source_to_line + FROM clone + JOIN commit_snapshot target + ON clone.target_commit_sha = target.sha + AND clone.target_repo_id = target.repo_id + JOIN repo_github target_repo + ON target.repo_id = target_repo.id + JOIN user_github target_repo_owner + ON target_repo.owner_id = target_repo_owner.id + JOIN commit_snapshot_pull target_snap_to_pull + ON target.sha = target_snap_to_pull.commit_snapshot_sha + AND target.repo_id = target_snap_to_pull.commit_snapshot_repo_id + JOIN commit_snapshot source + ON clone.source_commit_sha = source.sha + AND clone.source_repo_id = source.repo_id + JOIN repo_github source_repo + ON source.repo_id = source_repo.id + JOIN user_github source_repo_owner + ON source_repo.owner_id = source_repo_owner.id + JOIN commit_snapshot_pull source_snap_to_pull + ON source.sha = source_snap_to_pull.commit_snapshot_sha + AND source.repo_id = source_snap_to_pull.commit_snapshot_repo_id + WHERE clone.target_commit_sha = $1 + """) + .bind("$1", sha) + .execute()) + .flatMapMany(result -> ConnectionProvidedRepo.convertMany(result, this::convert))); } private static PostgresqlStatement insertStatement(final Connection connection) { diff --git a/api/src/main/java/org/accula/api/db/repo/CommitSnapshotRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/CommitSnapshotRepoImpl.java index 068d7232..a15f853a 100644 --- a/api/src/main/java/org/accula/api/db/repo/CommitSnapshotRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/CommitSnapshotRepoImpl.java @@ -1,10 +1,10 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Row; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.CommitSnapshot; import org.springframework.stereotype.Component; @@ -19,17 +19,16 @@ */ @Component @RequiredArgsConstructor -public final class CommitSnapshotRepoImpl implements CommitSnapshotRepo { - private final ConnectionPool connectionPool; +public final class CommitSnapshotRepoImpl implements CommitSnapshotRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono insert(final CommitSnapshot commitSnapshot) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(applyInsertBindings(commitSnapshot, insertStatement(connection)) - .execute()) - .flatMap(result -> Repos.closeAndReturn(connection, commitSnapshot))); + return withConnection(connection -> applyInsertBindings(commitSnapshot, insertStatement(connection)) + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .then(Mono.just(commitSnapshot))); } @Override @@ -37,18 +36,16 @@ public Flux insert(final Collection commitSnapsh if (commitSnapshots.isEmpty()) { return Flux.empty(); } - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = insertStatement(connection); - commitSnapshots.forEach(snapshot -> applyInsertBindings(snapshot, statement).add()); - statement.fetchSize(commitSnapshots.size()); - - return statement - .execute() - .flatMap(PostgresqlResult::getRowsUpdated) - .thenMany(Repos.closeAndReturn(connection, commitSnapshots)); - }); + + return manyWithConnection(connection -> { + final var statement = insertStatement(connection); + commitSnapshots.forEach(snapshot -> applyInsertBindings(snapshot, statement).add()); + + return statement + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(Flux.fromIterable(commitSnapshots)); + }); } @Override @@ -61,51 +58,45 @@ public Flux mapToPulls(final Collection commitSn return Flux.empty(); } - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = (PostgresqlStatement) connection.createStatement(""" - INSERT INTO commit_snapshot_pull ( - commit_snapshot_sha, commit_snapshot_repo_id, pull_id) - VALUES ($1, $2, $3) - ON CONFLICT (commit_snapshot_sha, commit_snapshot_repo_id, pull_id) DO NOTHING - """); - commitSnapshots.forEach(commitSnapshot -> { - final var snapId = commitSnapshot.getId(); - statement.bind("$1", snapId.getSha()) - .bind("$2", snapId.getRepoId()) - .bind("$3", Objects.requireNonNull(commitSnapshot.getPullId())) - .add(); - }); - statement.fetchSize(commitSnapshots.size()); - - return statement.execute() - .flatMap(PostgresqlResult::getRowsUpdated) - .thenMany(Repos.closeAndReturn(connection, commitSnapshots)); - }); + return manyWithConnection(connection -> { + final var statement = (PostgresqlStatement) connection.createStatement(""" + INSERT INTO commit_snapshot_pull ( + commit_snapshot_sha, commit_snapshot_repo_id, pull_id) + VALUES ($1, $2, $3) + ON CONFLICT (commit_snapshot_sha, commit_snapshot_repo_id, pull_id) DO NOTHING + """); + commitSnapshots.forEach(commitSnapshot -> { + final var snapId = commitSnapshot.getId(); + statement.bind("$1", snapId.getSha()) + .bind("$2", snapId.getRepoId()) + .bind("$3", Objects.requireNonNull(commitSnapshot.getPullId())) + .add(); + }); + + return statement.execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(Flux.fromIterable(commitSnapshots)); + }); } @Override public Mono findById(final CommitSnapshot.Id id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(applySelectBindings(id, selectStatement(connection)) - .execute()) - .flatMap(result -> Repos.convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(applySelectBindings(id, selectStatement(connection)) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } @Override public Flux findById(final Collection ids) { - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = selectStatement(connection); - ids.forEach(id -> applySelectBindings(id, statement).add()); - statement.fetchSize(ids.size()); - - return Repos.convertMany(statement.execute(), connection, this::convert); - }); + return manyWithConnection(connection -> { + final var statement = selectStatement(connection); + ids.forEach(id -> applySelectBindings(id, statement).add()); + + return statement + .execute() + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert)); + }); } private static PostgresqlStatement insertStatement(final Connection connection) { diff --git a/api/src/main/java/org/accula/api/db/repo/ConnectionProvidedRepo.java b/api/src/main/java/org/accula/api/db/repo/ConnectionProvidedRepo.java new file mode 100644 index 00000000..948f7987 --- /dev/null +++ b/api/src/main/java/org/accula/api/db/repo/ConnectionProvidedRepo.java @@ -0,0 +1,52 @@ +package org.accula.api.db.repo; + +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Row; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Objects; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * @author Anton Lamtev + */ +public interface ConnectionProvidedRepo { + ConnectionProvider getConnectionProvider(); + + default Mono withConnection(final Function> connectionUse) { + return Mono.usingWhen( + getConnectionProvider().get(), + connectionUse, + Connection::close + ); + } + + default Flux manyWithConnection(final Function> connectionUse) { + return Flux.usingWhen( + getConnectionProvider().get(), + connectionUse, + Connection::close + ); + } + + static Mono column(final Result result, final String name, final Class clazz) { + return Mono.from(result + .map((row, metadata) -> Objects.requireNonNull(row.get(name, clazz)))); + } + + static Mono convert(final Result result, final Function transform) { + return Mono.from(result.map((row, metadata) -> transform.apply(row))); + } + + static Flux convertMany(final Result result, final Function transform) { + return Flux.from(result.map((row, metadata) -> transform.apply(row))); + } + + interface ConnectionProvider extends Supplier> { + @Override + Mono get(); + } +} diff --git a/api/src/main/java/org/accula/api/db/repo/GithubRepoRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/GithubRepoRepoImpl.java index 6d53cc09..29548fdd 100644 --- a/api/src/main/java/org/accula/api/db/repo/GithubRepoRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/GithubRepoRepoImpl.java @@ -1,10 +1,10 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Row; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.GithubRepo; import org.springframework.stereotype.Component; @@ -18,23 +18,22 @@ */ @Component @RequiredArgsConstructor -public final class GithubRepoRepoImpl implements GithubRepoRepo { - private final ConnectionPool connectionPool; +public final class GithubRepoRepoImpl implements GithubRepoRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono upsert(final GithubRepo repo) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(insertStatement(connection) - .bind("$1", repo.getId()) - .bind("$2", repo.getName()) - .bind("$3", repo.getOwner().getId()) - .bind("$4", repo.getDescription()) - .execute()) - .flatMap(result -> Mono.from(result.getRowsUpdated())) - .filter(Integer.valueOf(1)::equals) - .flatMap(rowsUpdated -> Repos.closeAndReturn(connection, repo))); + return withConnection(connection -> Mono + .from(insertStatement(connection) + .bind("$1", repo.getId()) + .bind("$2", repo.getName()) + .bind("$3", repo.getOwner().getId()) + .bind("$4", repo.getDescription()) + .execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .filter(Integer.valueOf(1)::equals) + .map(rowsUpdated -> repo)); } @Override @@ -43,33 +42,28 @@ public Flux upsert(final Collection repos) { return Flux.empty(); } - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = insertStatement(connection); - repos.forEach(repo -> statement - .bind("$1", repo.getId()) - .bind("$2", repo.getName()) - .bind("$3", repo.getOwner().getId()) - .bind("$4", repo.getDescription()) - .add()); - statement.fetchSize(repos.size()); + return manyWithConnection(connection -> { + final var statement = insertStatement(connection); + repos.forEach(repo -> statement + .bind("$1", repo.getId()) + .bind("$2", repo.getName()) + .bind("$3", repo.getOwner().getId()) + .bind("$4", repo.getDescription()) + .add()); - return statement.execute() - .flatMap(PostgresqlResult::getRowsUpdated) - .thenMany(Repos.closeAndReturn(connection, repos)); - }); + return statement.execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(Flux.fromIterable(repos)); + }); } @Override public Mono findById(final Long id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(selectStatement(connection) - .bind("$1", id) - .execute()) - .flatMap(result -> Repos.convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(selectStatement(connection) + .bind("$1", id) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } private static PostgresqlStatement insertStatement(final Connection connection) { diff --git a/api/src/main/java/org/accula/api/db/repo/GithubUserRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/GithubUserRepoImpl.java index 0c93da9d..28424f0e 100644 --- a/api/src/main/java/org/accula/api/db/repo/GithubUserRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/GithubUserRepoImpl.java @@ -1,10 +1,9 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; -import io.r2dbc.spi.Statement; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.GithubUser; import org.springframework.stereotype.Component; @@ -20,19 +19,16 @@ @Component @SuppressWarnings("PMD.ConfusingTernary") @RequiredArgsConstructor -public final class GithubUserRepoImpl implements GithubUserRepo { - private final ConnectionPool connectionPool; +public final class GithubUserRepoImpl implements GithubUserRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvidedRepo.ConnectionProvider connectionProvider; @Override public Mono upsert(final GithubUser user) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(applyInsertBindings(user, insertStatement(connection)) - .execute()) - .flatMap(result -> Mono.from(result.getRowsUpdated())) - .filter(Integer.valueOf(1)::equals) - .flatMap(rowsUpdated -> Repos.closeAndReturn(connection, user))); + return withConnection(connection -> applyInsertBindings(user, insertStatement(connection)) + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .then(Mono.just(user))); } @Override @@ -41,35 +37,29 @@ public Flux upsert(final Collection users) { return Flux.empty(); } - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = insertStatement(connection); - users.forEach(user -> applyInsertBindings(user, statement).add()); - statement.fetchSize(users.size()); + return manyWithConnection(connection -> { + final var statement = insertStatement(connection); + users.forEach(user -> applyInsertBindings(user, statement).add()); - return statement.execute() - .flatMap(PostgresqlResult::getRowsUpdated) - .thenMany(Repos.closeAndReturn(connection, users)); - }); + return statement.execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(Flux.fromIterable(users)); + }); } @Override public Mono findById(final Long id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(selectStatement(connection) - .bind("$1", id) - .execute()) - .flatMap(result -> Repos - .convert(result, connection, row -> new GithubUser( - Objects.requireNonNull(row.get("id", Long.class)), - Objects.requireNonNull(row.get("login", String.class)), - Objects.requireNonNull(row.get("name", String.class)), - Objects.requireNonNull(row.get("avatar", String.class)), - Objects.requireNonNull(row.get("is_org", Boolean.class)) - )))); + return withConnection(connection -> Mono + .from(selectStatement(connection) + .bind("$1", id) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, row -> new GithubUser( + Objects.requireNonNull(row.get("id", Long.class)), + Objects.requireNonNull(row.get("login", String.class)), + Objects.requireNonNull(row.get("name", String.class)), + Objects.requireNonNull(row.get("avatar", String.class)), + Objects.requireNonNull(row.get("is_org", Boolean.class)) + )))); } private static PostgresqlStatement insertStatement(final Connection connection) { @@ -93,7 +83,7 @@ private static PostgresqlStatement selectStatement(final Connection connection) """); } - static Statement applyInsertBindings(final GithubUser user, final Statement statement) { + static PostgresqlStatement applyInsertBindings(final GithubUser user, final PostgresqlStatement statement) { statement.bind("$1", user.getId()); statement.bind("$2", user.getLogin()); if (user.getName() != null && !user.getName().isBlank()) { diff --git a/api/src/main/java/org/accula/api/db/repo/ProjectRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/ProjectRepoImpl.java index 82de90bb..a20b637b 100644 --- a/api/src/main/java/org/accula/api/db/repo/ProjectRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/ProjectRepoImpl.java @@ -1,10 +1,10 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Row; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.GithubRepo; import org.accula.api.db.model.Project; @@ -19,14 +19,14 @@ */ @Component @RequiredArgsConstructor -public final class ProjectRepoImpl implements ProjectRepo { - private final ConnectionPool connectionPool; +public final class ProjectRepoImpl implements ProjectRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono notExists(final Long githubRepoId) { - return connectionPool - .create() - .flatMap(connection -> Mono.from(connection + return withConnection(connection -> Mono + .from(connection .createStatement(""" SELECT NOT exists(SELECT 0 FROM project @@ -34,100 +34,90 @@ SELECT NOT exists(SELECT 0 """) .bind("$1", githubRepoId) .execute()) - .flatMap(result -> Repos.closeAndGet(connection, result, "not_exists", Boolean.class))); + .flatMap(result -> ConnectionProvidedRepo.column(result, "not_exists", Boolean.class))); } @Override public Mono upsert(final GithubRepo githubRepo, final User creator) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(connection - .createStatement(""" - WITH upserted_gh_repo AS ( - INSERT INTO repo_github (id, name, owner_id, description) - VALUES ($1, $2, $3, $4) - ON CONFLICT (id) DO UPDATE - SET name = $2, - owner_id = $3, - description = $4 - RETURNING id - ) - INSERT INTO project (github_repo_id, creator_id) - SELECT id, $5 - FROM upserted_gh_repo - ON CONFLICT (github_repo_id) DO UPDATE - SET creator_id = $5 - RETURNING id - """) - .bind("$1", githubRepo.getId()) - .bind("$2", githubRepo.getName()) - .bind("$3", githubRepo.getOwner().getId()) - .bind("$4", githubRepo.getDescription()) - .bind("$5", creator.getId()) - .execute()) - .flatMap(result -> Repos - .convert(result, connection, row -> Project.builder() - .id(Converters.value(row, "id", Long.class)) - .githubRepo(githubRepo) - .creator(creator) - .build() - ))); + return withConnection(connection -> Mono + .from(connection + .createStatement(""" + WITH upserted_gh_repo AS ( + INSERT INTO repo_github (id, name, owner_id, description) + VALUES ($1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE + SET name = $2, + owner_id = $3, + description = $4 + RETURNING id + ) + INSERT INTO project (github_repo_id, creator_id) + SELECT id, $5 + FROM upserted_gh_repo + ON CONFLICT (github_repo_id) DO UPDATE + SET creator_id = $5 + RETURNING id + """) + .bind("$1", githubRepo.getId()) + .bind("$2", githubRepo.getName()) + .bind("$3", githubRepo.getOwner().getId()) + .bind("$4", githubRepo.getDescription()) + .bind("$5", creator.getId()) + .execute()) + .flatMap(result -> ConnectionProvidedRepo + .convert(result, row -> Project.builder() + .id(Converters.value(row, "id", Long.class)) + .githubRepo(githubRepo) + .creator(creator) + .build() + ))); } @Override public Mono findById(final Long id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(selectByIdStatement(connection) - .bind("$1", id) - .execute()) - .flatMap(result -> Repos.convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(selectByIdStatement(connection) + .bind("$1", id) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } @Override public Mono idByRepoId(final Long repoId) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(connection - .createStatement(""" - SELECT id - FROM project - WHERE github_repo_id = $1 - """) - .bind("$1", repoId) - .execute()) - .flatMap(result -> Repos.closeAndGet(connection, result, "id", Long.class))); + return withConnection(connection -> Mono + .from(connection + .createStatement(""" + SELECT id + FROM project + WHERE github_repo_id = $1 + """) + .bind("$1", repoId) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.column(result, "id", Long.class))); } @Override public Flux getTop(final int count) { - return connectionPool - .create() - .flatMapMany(connection -> Mono - .from(selectTopStatement(connection) - .bind("$1", count) - .execute()) - .flatMapMany(result -> Repos.convertMany(result, connection, this::convert))); + return manyWithConnection(connection -> Mono + .from(selectTopStatement(connection) + .bind("$1", count) + .execute()) + .flatMapMany(result -> ConnectionProvidedRepo.convertMany(result, this::convert))); } @Override public Mono delete(final Long id, final Long creatorId) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(((PostgresqlStatement) connection - .createStatement(""" - DELETE FROM project - WHERE id = $1 AND creator_id = $2 - """)) - .bind("$1", id) - .bind("$2", creatorId) - .execute()) - .flatMap(PostgresqlResult::getRowsUpdated) - .flatMap(rowsUpdated -> Repos.closeAndReturn(connection, rowsUpdated.equals(1)))); + return withConnection(connection -> Mono + .from(((PostgresqlStatement) connection + .createStatement(""" + DELETE FROM project + WHERE id = $1 AND creator_id = $2 + """)) + .bind("$1", id) + .bind("$2", creatorId) + .execute()) + .flatMap(PostgresqlResult::getRowsUpdated) + .map(Integer.valueOf(1)::equals)); } private static PostgresqlStatement selectByIdStatement(final Connection connection) { diff --git a/api/src/main/java/org/accula/api/db/repo/PullRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/PullRepoImpl.java index d70f3f39..741ab3de 100644 --- a/api/src/main/java/org/accula/api/db/repo/PullRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/PullRepoImpl.java @@ -1,10 +1,10 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Connection; import io.r2dbc.spi.Row; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.Pull; import org.intellij.lang.annotations.Language; @@ -20,19 +20,16 @@ */ @Component @RequiredArgsConstructor -public final class PullRepoImpl implements PullRepo { - private final ConnectionPool connectionPool; +public final class PullRepoImpl implements PullRepo, ConnectionProvidedRepo { + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono upsert(final Pull pull) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(applyInsertBindings(insertStatement(connection), pull) - .execute()) - .flatMap(PostgresqlResult::getRowsUpdated) - .filter(Integer.valueOf(1)::equals) - .flatMap(rowsUpdated -> Repos.closeAndReturn(connection, pull))); + return withConnection(connection -> applyInsertBindings(insertStatement(connection), pull) + .execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .then(Mono.just(pull))); } @Override @@ -41,78 +38,70 @@ public Flux upsert(final Collection pulls) { return Flux.empty(); } - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = insertStatement(connection); - pulls.forEach(pull -> applyInsertBindings(statement, pull).add()); - statement.fetchSize(pulls.size()); + return manyWithConnection(connection -> { + final var statement = insertStatement(connection); + pulls.forEach(pull -> applyInsertBindings(statement, pull).add()); - return statement.execute() - .flatMap(PostgresqlResult::getRowsUpdated) - .thenMany(Repos.closeAndReturn(connection, pulls)); - }); + return statement.execute() + .flatMap(PostgresqlResult::getRowsUpdated) + .thenMany(Flux.fromIterable(pulls)); + }); } @Override public Mono findById(final Long id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(selectByIdStatement(connection) - .bind("$1", id) - .execute()) - .flatMap(result -> Repos.convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(selectByIdStatement(connection) + .bind("$1", id) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } @Override public Flux findById(final Collection ids) { - return connectionPool - .create() - .flatMapMany(connection -> { - final var statement = selectByIdStatement(connection); - ids.forEach(id -> statement - .bind("$1", id) - .add()); - statement.fetchSize(ids.size()); + if (ids.isEmpty()) { + return Flux.empty(); + } + + return manyWithConnection(connection -> { + final var statement = selectByIdStatement(connection); + ids.forEach(id -> statement + .bind("$1", id) + .add()); - return Repos.convertMany(statement.execute(), connection, this::convert); - }); + return statement + .execute() + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert)); + }); } @Override public Mono findByNumber(final Long projectId, final Integer number) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(selectByNumberStatement(connection) - .bind("$1", projectId) - .bind("$2", number) - .execute()) - .flatMap(result -> Repos.convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(selectByNumberStatement(connection) + .bind("$1", projectId) + .bind("$2", number) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } @Override public Flux findUpdatedEarlierThan(final Long projectId, final Integer number) { - return connectionPool - .create() - .flatMapMany(connection -> Mono - .from(selectUpdatedEarlierStatement(connection) - .bind("$1", projectId) - .bind("$2", number) - .execute()) - .flatMapMany(result -> Repos.convertMany(result, connection, this::convert))); + return manyWithConnection(connection -> Mono + .from(selectUpdatedEarlierStatement(connection) + .bind("$1", projectId) + .bind("$2", number) + .execute()) + .flatMapMany(result -> ConnectionProvidedRepo.convertMany(result, this::convert))); } @Override public Flux findByProjectId(final Long projectId) { - return connectionPool - .create() - .flatMapMany(connection -> Mono - .from(selectByProjectIdStatement(connection) - .bind("$1", projectId) - .execute()) - .flatMapMany(result -> Repos.convertMany(result, connection, this::convert))); + return manyWithConnection(connection -> Mono + .from(selectByProjectIdStatement(connection) + .bind("$1", projectId) + .execute()) + .flatMapMany(result -> ConnectionProvidedRepo.convertMany(result, this::convert))); } private static PostgresqlStatement insertStatement(final Connection connection) { diff --git a/api/src/main/java/org/accula/api/db/repo/Repos.java b/api/src/main/java/org/accula/api/db/repo/Repos.java deleted file mode 100644 index 247781ff..00000000 --- a/api/src/main/java/org/accula/api/db/repo/Repos.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.accula.api.db.repo; - -import io.r2dbc.spi.Connection; -import io.r2dbc.spi.Result; -import io.r2dbc.spi.Row; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.Objects; -import java.util.function.Function; - -/** - * @author Anton Lamtev - */ -final class Repos { - private Repos() { - } - - static Mono closeAndReturn(final Connection connection, final T value) { - return Mono.from(connection.close()).thenReturn(value); - } - - static Flux closeAndReturn(final Connection connection, final Flux values) { - return Mono.from(connection.close()).thenMany(values); - } - - static Flux closeAndReturn(final Connection connection, final Iterable values) { - return Mono.from(connection.close()).thenMany(Flux.fromIterable(values)); - } - - static Mono column(final Result result, final String name, final Class clazz) { - return Mono.from(result - .map((row, metadata) -> Objects.requireNonNull(row.get(name, clazz)))); - } - - static Mono closeAndGet(final Connection connection, final Result result, final String name, final Class clazz) { - return column(result, name, clazz) - .flatMap(column -> closeAndReturn(connection, column)); - } - - static Mono convert(final Result result, final Connection connection, final Function transform) { - return Mono.from(result.map((row, metadata) -> transform.apply(row))) - .flatMap(res -> closeAndReturn(connection, res)); - } - - static Flux convertMany(final Result result, final Connection connection, final Function transform) { - final var results = Flux.from(result.map((row, metadata) -> transform.apply(row))).cache(); - return results.thenMany(closeAndReturn(connection, results)); - } - - static Flux convertMany(final Flux result, final Connection connection, final Function transform) { - final var results = result - .flatMap(res -> res - .map((row, metadata) -> transform.apply(row))) - .cache(); - return results.thenMany(closeAndReturn(connection, results)); - } -} diff --git a/api/src/main/java/org/accula/api/db/repo/UserRepoImpl.java b/api/src/main/java/org/accula/api/db/repo/UserRepoImpl.java index 9be393f5..77b4c484 100644 --- a/api/src/main/java/org/accula/api/db/repo/UserRepoImpl.java +++ b/api/src/main/java/org/accula/api/db/repo/UserRepoImpl.java @@ -1,8 +1,8 @@ package org.accula.api.db.repo; -import io.r2dbc.pool.ConnectionPool; +import io.r2dbc.postgresql.api.PostgresqlStatement; import io.r2dbc.spi.Row; -import io.r2dbc.spi.Statement; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.accula.api.db.model.GithubUser; import org.accula.api.db.model.User; @@ -17,67 +17,63 @@ */ @Component @RequiredArgsConstructor -public final class UserRepoImpl implements UserRepo { +public final class UserRepoImpl implements UserRepo, ConnectionProvidedRepo { private final Set onUpserts = ConcurrentHashMap.newKeySet(); - private final ConnectionPool connectionPool; + @Getter + private final ConnectionProvider connectionProvider; @Override public Mono upsert(final GithubUser githubUser, final String githubAccessToken) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(applyInsertBindings(githubUser, githubAccessToken, connection - .createStatement(""" - WITH upserted_gh_user AS ( - INSERT INTO user_github (id, login, name, avatar, is_org) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (id) DO UPDATE - SET login = $2, - name = COALESCE($3, user_github.name), - avatar = $4, - is_org = $5 - RETURNING id - ) - INSERT INTO user_ (github_id, github_access_token) - SELECT id, $6 - FROM upserted_gh_user - ON CONFLICT (github_id) DO UPDATE - SET github_access_token = $6 - RETURNING id - """)) - .execute()) - .flatMap(result -> Repos - .convert(result, connection, row -> new User( - Converters.value(row, "id", Long.class), - githubAccessToken, - githubUser - ))) - .doOnSuccess(user -> onUpserts - .forEach(onUpsert -> onUpsert.onUpsert(user.getId())))); + return withConnection(connection -> Mono + .from(applyInsertBindings(githubUser, githubAccessToken, (PostgresqlStatement) connection + .createStatement(""" + WITH upserted_gh_user AS ( + INSERT INTO user_github (id, login, name, avatar, is_org) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO UPDATE + SET login = $2, + name = COALESCE($3, user_github.name), + avatar = $4, + is_org = $5 + RETURNING id + ) + INSERT INTO user_ (github_id, github_access_token) + SELECT id, $6 + FROM upserted_gh_user + ON CONFLICT (github_id) DO UPDATE + SET github_access_token = $6 + RETURNING id + """)) + .execute()) + .flatMap(result -> ConnectionProvidedRepo + .convert(result, row -> new User( + Converters.value(row, "id", Long.class), + githubAccessToken, + githubUser + )))) + .doOnSuccess(user -> onUpserts + .forEach(onUpsert -> onUpsert.onUpsert(user.getId()))); } @Override public Mono findById(final Long id) { - return connectionPool - .create() - .flatMap(connection -> Mono - .from(connection - .createStatement(""" - SELECT u.id, - ug.id AS github_id, - ug.login AS github_login, - ug.name AS github_name, - ug.avatar AS github_avatar, - ug.is_org AS github_org, - u.github_access_token - FROM user_ u - JOIN user_github ug ON u.github_id = ug.id - WHERE u.id = $1 - """) - .bind("$1", id) - .execute()) - .flatMap(result -> Repos - .convert(result, connection, this::convert))); + return withConnection(connection -> Mono + .from(connection + .createStatement(""" + SELECT u.id, + ug.id AS github_id, + ug.login AS github_login, + ug.name AS github_name, + ug.avatar AS github_avatar, + ug.is_org AS github_org, + u.github_access_token + FROM user_ u + JOIN user_github ug ON u.github_id = ug.id + WHERE u.id = $1 + """) + .bind("$1", id) + .execute()) + .flatMap(result -> ConnectionProvidedRepo.convert(result, this::convert))); } @Override @@ -85,9 +81,9 @@ public void addOnUpsert(final OnUpsert onUpsert) { onUpserts.add(onUpsert); } - private static Statement applyInsertBindings(final GithubUser githubUser, - final String githubAccessToken, - final Statement statement) { + private static PostgresqlStatement applyInsertBindings(final GithubUser githubUser, + final String githubAccessToken, + final PostgresqlStatement statement) { return GithubUserRepoImpl .applyInsertBindings(githubUser, statement) .bind("$6", githubAccessToken); diff --git a/api/src/main/java/org/accula/api/detector/PrimitiveCloneDetector.java b/api/src/main/java/org/accula/api/detector/PrimitiveCloneDetector.java index 2d46793a..c6df6950 100644 --- a/api/src/main/java/org/accula/api/detector/PrimitiveCloneDetector.java +++ b/api/src/main/java/org/accula/api/detector/PrimitiveCloneDetector.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,7 +32,8 @@ public Flux> findClones(final Flux final Mono>> target = targetFiles .map(this::lineToSnippetsMap) .collectList() - .map(PrimitiveCloneDetector::reduceMaps); + .map(PrimitiveCloneDetector::reduceMaps) + .cache(); return sourceFiles .flatMap(source -> target.zipWith(Mono.just(source))) .flatMap(targetAndSource -> Flux.fromIterable(findClonesInFile(targetAndSource.getT1(), targetAndSource.getT2()))) @@ -68,7 +70,11 @@ private List> findClonesInFile(final Map clones.add(Tuples.of(s, source))); } } - return tryMerge(clones); + + final var sorted = clones.stream() + .sorted(Comparator.comparing(t -> t.getT1().toString())) + .collect(Collectors.toList()); + return tryMerge(sorted); } private List> tryMerge(final List> clones) { diff --git a/api/src/main/java/org/accula/api/handlers/GithubWebhookHandler.java b/api/src/main/java/org/accula/api/handlers/GithubWebhookHandler.java index d36ac142..e687d380 100644 --- a/api/src/main/java/org/accula/api/handlers/GithubWebhookHandler.java +++ b/api/src/main/java/org/accula/api/handlers/GithubWebhookHandler.java @@ -1,6 +1,7 @@ package org.accula.api.handlers; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.accula.api.code.CodeLoader; import org.accula.api.code.FileFilter; import org.accula.api.db.model.Clone; @@ -25,6 +26,7 @@ * @author Vadim Dyachkov */ @Component +@Slf4j @RequiredArgsConstructor public final class GithubWebhookHandler { private static final String GITHUB_EVENT = "X-GitHub-Event"; @@ -46,6 +48,10 @@ public Mono webhook(final ServerRequest request) { return request .bodyToMono(GithubApiHookPayload.class) .flatMap(this::processPayload) + .onErrorResume(e -> { + log.error("Error during payload processing: ", e); + return Mono.empty(); + }) .flatMap(p -> ServerResponse.ok().build()); } @@ -59,22 +65,22 @@ public Mono processPayload(final GithubApiHookPayload payload) { final var targetFiles = savedPull .map(Pull::getHead) - .flatMapMany(head -> loader.getFiles(head, FileFilter.SRC_JAVA)) - .cache(); + .flatMapMany(head -> loader.getFiles(head, FileFilter.JAVA)); final var sourceFiles = savedPull .flatMapMany(pull -> pullRepo.findUpdatedEarlierThan(pull.getProjectId(), pull.getNumber())) .map(Pull::getHead) - .flatMap(head -> loader.getFiles(head, FileFilter.SRC_JAVA)) + .flatMap(head -> loader.getFiles(head, FileFilter.JAVA)) .cache(); final var cloneFlux = detector .findClones(targetFiles, sourceFiles) - .map(this::convert) - .subscribeOn(processingScheduler); + .map(this::convert); - return cloneFlux.collectList() + final var saveClones = cloneFlux.collectList() .flatMap(clones -> cloneRepo.insert(clones).then()); + + return Mono.when(savedPull, saveClones); } private Clone convert(final Tuple2 clone) { diff --git a/api/src/main/java/org/accula/api/handlers/ProjectsHandler.java b/api/src/main/java/org/accula/api/handlers/ProjectsHandler.java index fbde3d12..a217f67c 100644 --- a/api/src/main/java/org/accula/api/handlers/ProjectsHandler.java +++ b/api/src/main/java/org/accula/api/handlers/ProjectsHandler.java @@ -99,7 +99,7 @@ public Mono create(final ServerRequest request) { .contentType(APPLICATION_JSON) .bodyValue(project)) .doOnSuccess(response -> log.debug("{}: {}", request, response.statusCode())) - .doOnError(t -> log.error("{}: {}", request, t)) + .doOnError(t -> log.error("{}: ", request, t)) .onErrorResume(e -> e instanceof CreateProjectException, e -> switch (CreateProjectException.error(e)) { case BAD_FORMAT, INVALID_URL, WRONG_URL, ALREADY_EXISTS -> ServerResponse @@ -155,9 +155,11 @@ private Mono saveProjectData(final Tuple4 log.error("Error saving github user: {}", projectGithubRepo.getOwner(), e)) .filterWhen(repoOwner -> projectRepo.notExists(projectGithubRepo.getId())) .switchIfEmpty(Mono.error(CreateProjectException.ALREADY_EXISTS)) .flatMap(repoOwner -> projectRepo.upsert(projectGithubRepo, currentUser)) + .doOnError(e -> log.error("Error saving Project: {}-{}", projectGithubRepo.getOwner(), currentUser, e)) .flatMap(project -> projectUpdater.update(project.getId(), githubApiPulls) .map(openPullCount -> modelToDtoConverter.convert(project, openPullCount))); }); diff --git a/api/src/main/java/org/accula/api/handlers/util/ProjectUpdater.java b/api/src/main/java/org/accula/api/handlers/util/ProjectUpdater.java index 74c60800..3f5fda5d 100644 --- a/api/src/main/java/org/accula/api/handlers/util/ProjectUpdater.java +++ b/api/src/main/java/org/accula/api/handlers/util/ProjectUpdater.java @@ -1,6 +1,7 @@ package org.accula.api.handlers.util; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.accula.api.converter.GithubApiToModelConverter; import org.accula.api.db.model.CommitSnapshot; import org.accula.api.db.model.GithubRepo; @@ -16,6 +17,7 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -23,6 +25,7 @@ * @author Anton Lamtev */ @Component +@Slf4j @RequiredArgsConstructor public final class ProjectUpdater { private final Scheduler processingScheduler = Schedulers.boundedElastic(); @@ -65,6 +68,8 @@ public Mono update(final Long projectId, final GithubApiPull[] githubAp .thenMany(commitSnapshotRepo.mapToPulls(commitSnapshots)) .then(Mono.just(openPullCount)); }) + .doOnSuccess(success -> log.info("Project has been updated successfully with pulls={}", Arrays.toString(githubApiPulls))) + .doOnError(e -> log.error("Failed to update project with pulls={}", Arrays.toString(githubApiPulls), e)) .subscribeOn(processingScheduler); } @@ -88,6 +93,8 @@ public Mono update(final Long projectId, final GithubApiPull githubApiPull .thenMany(commitSnapshotRepo.mapToPulls(commitSnapshots)) .then(Mono.just(pull)); }) + .doOnSuccess(success -> log.info("Project has been updated successfully with pull={}", githubApiPull)) + .doOnError(e -> log.error("Failed to update project with pull={}", githubApiPull, e)) .subscribeOn(processingScheduler); } diff --git a/api/src/main/resources/application.yml b/api/src/main/resources/application.yml index 0762c58a..9ca46c7d 100644 --- a/api/src/main/resources/application.yml +++ b/api/src/main/resources/application.yml @@ -54,9 +54,9 @@ accula: database: ${DB_NAME} pool: - maxIdleTime: 5s - minSize: 2 - maxSize: 20 + maxIdleTime: 1m + minSize: 10 + maxSize: 195 reposPath: ${REPOS_PATH} diff --git a/api/src/main/resources/log4j.properties b/api/src/main/resources/log4j.properties index 61bd973b..f07f38e8 100644 --- a/api/src/main/resources/log4j.properties +++ b/api/src/main/resources/log4j.properties @@ -1,6 +1,8 @@ -log4j.rootLogger=DEBUG,stdout +log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss:SSS} %-5p %c - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss:SSS} %-5p %c - [%t]: %m%n log4j.logger.org.springframework.data.r2dbc=DEBUG +log4j.logger.org.springframework=DEBUG log4j.logger.org.accula.api=DEBUG +log4j.logger.io.r2dbc=DEBUG diff --git a/api/src/test/java/org/accula/api/detector/CloneDetectorTest.java b/api/src/test/java/org/accula/api/detector/CloneDetectorTest.java index 90c554c6..ac6c743c 100644 --- a/api/src/test/java/org/accula/api/detector/CloneDetectorTest.java +++ b/api/src/test/java/org/accula/api/detector/CloneDetectorTest.java @@ -80,7 +80,7 @@ void testReal(@TempDir final File tempDir) { List> clones = detector.findClones(targetFiles, sourceFiles).collectList().block(); assertNotNull(clones); clones.forEach(t -> printClone(codeLoader, t)); - assertEquals(5, clones.size()); + assertEquals(6, clones.size()); } private void printClone(CodeLoader codeLoader, Tuple2 clone) { diff --git a/docker-compose.yml b/docker-compose.yml index ad6a23fd..9167cbc1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,11 +34,13 @@ services: - JWT_SIGNATURE_PRIVATE_KEY=/app/keys/accula.private.der volumes: - ./repos:/app/repos + - ./keys:/app/keys postgres: - image: postgres:11 + image: postgres:12.3-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres - POSTGRES_DB=accula + command: postgres -c 'max_connections=200' -c 'shared_buffers=1GB' volumes: - ./data/postgres:/var/lib/postgresql/data diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a4f0001d..622ab64a 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists