From cb18027a55b06df27a676a6bada77bed8c8e708e Mon Sep 17 00:00:00 2001 From: Ilias Xenogiannis Date: Fri, 30 Aug 2024 18:58:02 +0300 Subject: [PATCH] Add support for Spark iceberg --- .github/workflows/pr_tests.yml | 58 ++- .github/workflows/spark_deployment/Dockerfile | 34 ++ .../spark_deployment/build_and_push.sh | 20 + .../spark_deployment/docker-compose.yml | 66 ++++ .../spark_deployment/spark-defaults.conf | 44 +++ .../.scripts/integration_tests.sh | 2 +- integration_tests/ci/profiles.yml | 21 +- integration_tests/dbt_project.yml | 23 +- .../snowplow_base_events_this_run_actual.sql | 13 +- ...ase_sessions_lifecycle_manifest_actual.sql | 16 +- .../base/source/spark/snowplow_events_stg.sql | 346 ++++++++++++++++++ .../default_strategy/test_incremental.sql | 18 +- .../test_incremental_w_lookback_disabled.sql | 5 +- .../test_snowplow_delete_from_manifest.sql | 5 +- .../base_create_snowplow_events_this_run.sql | 68 ++++ macros/utils/cross_db/get_field.sql | 8 +- macros/utils/cross_db/get_string_agg.sql | 30 ++ macros/utils/cross_db/timestamp_functions.sql | 41 ++- macros/utils/get_value_by_target_type.sql | 6 +- .../utils/snowplow_delete_from_manifest.sql | 19 +- 20 files changed, 787 insertions(+), 56 deletions(-) create mode 100644 .github/workflows/spark_deployment/Dockerfile create mode 100755 .github/workflows/spark_deployment/build_and_push.sh create mode 100644 .github/workflows/spark_deployment/docker-compose.yml create mode 100644 .github/workflows/spark_deployment/spark-defaults.conf create mode 100644 integration_tests/models/base/source/spark/snowplow_events_stg.sql diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index a63cd3db..8903af73 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -59,9 +59,10 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift"] # TODO: Add RS self-hosted runner + warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner services: postgres: image: postgres:latest @@ -82,7 +83,26 @@ jobs: steps: - name: Check out uses: actions/checkout@v3 - + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env @@ -92,7 +112,7 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV - name: Python setup uses: actions/setup-python@v4 @@ -103,32 +123,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | pip install wheel setuptools - pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} - name: Run tests - run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_tests.sh -d ${{matrix.warehouse}} - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 00000000..dab57200 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 00000000..1be2b6d2 --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 00000000..2e8077ba --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 00000000..9052a056 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/integration_tests/.scripts/integration_tests.sh b/integration_tests/.scripts/integration_tests.sh index e7b70de5..e622f167 100755 --- a/integration_tests/.scripts/integration_tests.sh +++ b/integration_tests/.scripts/integration_tests.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index d75a3270..ff3eca05 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -47,7 +47,6 @@ integration_tests: token_uri: "{{ env_var('BIGQUERY_SERVICE_TOKEN_URI') }}" auth_provider_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_AUTH_PROVIDER_X509_CERT_URL') }}" client_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_CLIENT_X509_CERT_URL') }}" - snowflake: type: snowflake account: "{{ env_var('SNOWFLAKE_TEST_ACCOUNT') }}" @@ -58,7 +57,6 @@ integration_tests: warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}" schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" threads: 4 - databricks: type: databricks schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" @@ -66,12 +64,15 @@ integration_tests: http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 4 - - spark: + spark_iceberg: type: spark - schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - threads: 4 + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "{{ env_var('SPARK_SCHEMA', 'default') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 1 + vars: + snowplow__datalake_file_format: iceberg \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 821e8dca..d85725ca 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -41,6 +41,7 @@ vars: snowplow__session_identifiers: [{"schema": "atomic", "field" : "domain_sessionid"}] snowplow__bigquery_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2_0_0", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1_0_0", "field" : "session_id"}] snowplow__databricks_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "session_id"}] + snowplow__spark_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "session_identifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "session_id"}] snowplow__postgres_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2_0_0", "field": "session_identifier", "prefix": "si_t", "alias": "sito"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1_0_0", "field" : "session_id", "prefix" : "si_o", "alias": "sido"}] snowplow__snowflake_session_identifiers: [{"schema": "contexts_com_snowplowanalytics_session_identifier_2", "field": "sessionIdentifier"}, {"schema": "contexts_com_snowplowanalytics_session_identifier_1", "field" : "sessionId"}] snowplow__custom_session_sql: 'CAST(DATE(collector_tstamp) as {{ dbt.type_string() }})' @@ -48,6 +49,7 @@ vars: snowplow__user_identifiers: [{"schema": "atomic", "field" : "domain_userid"}] snowplow__bigquery_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2_0_0", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1_0_0", "field" : "user_id"}] snowplow__databricks_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "user_id"}] + snowplow__spark_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "user_id"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "user_id"}] snowplow__postgres_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2_0_0", "field" : "user_id", "prefix" : "ui_t", "alias": "uidt"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1_0_0", "field" : "user_id", "prefix": "ui_o", "alias": "uido"}] snowplow__snowflake_user_identifiers: [{"schema": "contexts_com_snowplowanalytics_user_identifier_2", "field" : "userId"}, {"schema": "contexts_com_snowplowanalytics_user_identifier_1", "field" : "userId"}] snowplow__quarantined_sessions: 'snowplow_base_quarantined_sessions_actual' @@ -57,6 +59,7 @@ vars: snowplow__custom_entities_or_sdes: [{"schema" : "contexts_com_snowplowanalytics_custom_entity_1_0_0", "prefix": "custom", "single_entity": true}] snowplow__bigquery_custom_sql: 'cast(contexts_com_snowplowanalytics_custom_entity_1_0_0[safe_offset(0)].contents as STRING) as custom_contents' snowplow__databricks_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents as custom_contents' + snowplow__spark_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents as custom_contents' snowplow__snowflake_custom_sql: 'contexts_com_snowplowanalytics_custom_entity_1[0].contents::TEXT as custom_contents' snowplow__derived_tstamp_partitioned: true snowplow__days_late_allowed: 3 @@ -73,10 +76,19 @@ vars: models: snowplow_utils_integration_tests: +schema: "snplw_utils_int_tests" + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" + +file_format: > + {{ + var( + 'snowplow__datalake_file_format', + 'delta' if target.type != 'spark' else 'iceberg' + ) + }} materializations: snowflake_delete_insert: enabled: "{{ target.type == 'snowflake' | as_bool() }}" utils: + +materialized: "{{ 'table' if target.type in ['spark'] else 'view' }}" bigquery: enabled: "{{ target.type == 'bigquery' | as_bool() }}" cross_db: @@ -93,6 +105,11 @@ models: data_get_string_agg_grp: +materialized: table + incremental_hooks: + +materialized: "{{ 'table' if target.type in ['spark'] else 'view' }}" + + + base: +bind: false +materialized: table @@ -100,11 +117,13 @@ models: bigquery: +enabled: "{{ target.type == 'bigquery' | as_bool() }}" databricks: - +enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" + +enabled: "{{ target.type == 'databricks' | as_bool() }}" default: +enabled: "{{ target.type in ['redshift', 'postgres'] | as_bool() }}" snowflake: +enabled: "{{ target.type == 'snowflake' | as_bool() }}" + spark: + +enabled: "{{ target.type == 'spark' | as_bool() }}" tests: @@ -180,10 +199,12 @@ seeds: data_incremental_expected: +column_types: id: integer + id2: integer start_tstamp: timestamp data_incremental_w_lookback_disabled_expected: +column_types: id: integer + id2: integer start_tstamp: timestamp utils: diff --git a/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql b/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql index 9af989fe..afbc1154 100644 --- a/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql +++ b/integration_tests/models/base/actual/snowplow_base_events_this_run_actual.sql @@ -20,9 +20,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set snowplow_session_sql = '' %} {% if var('snowplow__custom_test', false) %} - {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), snowflake_val=var("snowplow__snowflake_session_identifiers"), databricks_val=var("snowplow__databricks_session_identifiers"), postgres_val=var("snowplow__postgres_session_identifiers"), redshift_val=var("snowplow__postgres_session_identifiers"))%} + {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), + snowflake_val=var("snowplow__snowflake_session_identifiers"), + databricks_val=var("snowplow__databricks_session_identifiers"), + spark_val=var("snowplow__spark_session_identifiers"), + postgres_val=var("snowplow__postgres_session_identifiers"), + redshift_val=var("snowplow__postgres_session_identifiers"))%} {% set snowplow_entities_or_sdes = var("snowplow__custom_entities_or_sdes") %} - {% set snowplow_custom_sql = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_custom_sql"), snowflake_val=var("snowplow__snowflake_custom_sql"), databricks_val=var("snowplow__databricks_custom_sql"))%} + {% set snowplow_custom_sql = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_custom_sql"), + snowflake_val=var("snowplow__snowflake_custom_sql"), + databricks_val=var("snowplow__databricks_custom_sql"), + spark_val=var("snowplow__spark_custom_sql") + )%} {% elif var('snowplow__session_test', false) %} {% set snowplow_session_sql = var("snowplow__custom_session_sql") %} {% endif %} diff --git a/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql b/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql index 9fe05b2a..ed2a4250 100644 --- a/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql +++ b/integration_tests/models/base/actual/snowplow_base_sessions_lifecycle_manifest_actual.sql @@ -15,8 +15,20 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% set snowplow_session_sql = '' %} {% if var('snowplow__custom_test', false) %} - {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_session_identifiers"), snowflake_val=var("snowplow__snowflake_session_identifiers"), databricks_val=var("snowplow__databricks_session_identifiers"), postgres_val=var("snowplow__postgres_session_identifiers"), redshift_val=var("snowplow__postgres_session_identifiers"))%} - {% set snowplow_user_identifiers = snowplow_utils.get_value_by_target_type(bigquery_val=var("snowplow__bigquery_user_identifiers"), snowflake_val=var("snowplow__snowflake_user_identifiers"), databricks_val=var("snowplow__databricks_user_identifiers"), postgres_val=var("snowplow__postgres_user_identifiers"), redshift_val=var("snowplow__postgres_user_identifiers"))%} + {% set snowplow_session_identifiers = snowplow_utils.get_value_by_target_type( + bigquery_val=var("snowplow__bigquery_session_identifiers"), + snowflake_val=var("snowplow__snowflake_session_identifiers"), + databricks_val=var("snowplow__databricks_session_identifiers"), + spark_val=var("snowplow__spark_session_identifiers"), + postgres_val=var("snowplow__postgres_session_identifiers"), + redshift_val=var("snowplow__postgres_session_identifiers"))%} + {% set snowplow_user_identifiers = snowplow_utils.get_value_by_target_type( + bigquery_val=var("snowplow__bigquery_user_identifiers"), + snowflake_val=var("snowplow__snowflake_user_identifiers"), + databricks_val=var("snowplow__databricks_user_identifiers"), + spark_val=var("snowplow__spark_user_identifiers"), + postgres_val=var("snowplow__postgres_user_identifiers"), + redshift_val=var("snowplow__postgres_user_identifiers"))%} {% elif var('snowplow__session_test', false) %} {% set snowplow_session_sql = var("snowplow__custom_session_sql") %} {% endif %} diff --git a/integration_tests/models/base/source/spark/snowplow_events_stg.sql b/integration_tests/models/base/source/spark/snowplow_events_stg.sql new file mode 100644 index 00000000..bb5d1839 --- /dev/null +++ b/integration_tests/models/base/source/spark/snowplow_events_stg.sql @@ -0,0 +1,346 @@ +{# +Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} +{{ + config( + tags=['base_macro'] + ) +}} + +-- page view context is given as json string in csv. Parse json +with prep as ( + select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + se_category, + se_action, + se_label, + se_property, + se_value, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + dvce_sent_tstamp, + refr_domain_userid, + refr_dvce_tstamp, + domain_sessionid, + derived_tstamp, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + true_tstamp, + load_tstamp, + from_json(contexts_com_snowplowanalytics_snowplow_web_page_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_web_page_1, + from_json(unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1_0_0, 'array, consent_url:string, consent_version:string, domains_applied:array, event_type:string, gdpr_applies:string>>') as unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1, + from_json(unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1_0_0, 'array>') as unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1, + from_json(contexts_com_iab_snowplow_spiders_and_robots_1_0_0, 'array>') as contexts_com_iab_snowplow_spiders_and_robots_1, + from_json(contexts_nl_basjes_yauaa_context_1_0_0, 'array>') as contexts_nl_basjes_yauaa_context_1, + from_json(contexts_com_snowplowanalytics_user_identifier_1_0_0, 'array>') as contexts_com_snowplowanalytics_user_identifier_1, + from_json(contexts_com_snowplowanalytics_user_identifier_2_0_0, 'array>') as contexts_com_snowplowanalytics_user_identifier_2, + from_json(contexts_com_snowplowanalytics_session_identifier_1_0_0, 'array>') as contexts_com_snowplowanalytics_session_identifier_1, + from_json(contexts_com_snowplowanalytics_session_identifier_2_0_0, 'array>') as contexts_com_snowplowanalytics_session_identifier_2, + from_json(contexts_com_snowplowanalytics_custom_entity_1_0_0, 'array>') as contexts_com_snowplowanalytics_custom_entity_1 + from + {{ ref('snowplow_events') }} +) + +select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign, + se_category, + se_action, + se_label, + se_property, + se_value, + tr_orderid, + tr_affiliation, + tr_total, + tr_tax, + tr_shipping, + tr_city, + tr_state, + tr_country, + ti_orderid, + ti_sku, + ti_name, + ti_category, + ti_price, + ti_quantity, + pp_xoffset_min, + pp_xoffset_max, + pp_yoffset_min, + pp_yoffset_max, + useragent, + br_name, + br_family, + br_version, + br_type, + br_renderengine, + br_lang, + br_features_pdf, + br_features_flash, + br_features_java, + br_features_director, + br_features_quicktime, + br_features_realplayer, + br_features_windowsmedia, + br_features_gears, + br_features_silverlight, + br_cookies, + br_colordepth, + br_viewwidth, + br_viewheight, + os_name, + os_family, + os_manufacturer, + os_timezone, + dvce_type, + dvce_ismobile, + dvce_screenwidth, + dvce_screenheight, + doc_charset, + doc_width, + doc_height, + tr_currency, + tr_total_base, + tr_tax_base, + tr_shipping_base, + ti_currency, + ti_price_base, + base_currency, + geo_timezone, + mkt_clickid, + mkt_network, + etl_tags, + dvce_sent_tstamp, + refr_domain_userid, + refr_dvce_tstamp, + domain_sessionid, + derived_tstamp, + event_vendor, + event_name, + event_format, + event_version, + event_fingerprint, + true_tstamp, + load_tstamp, + contexts_com_snowplowanalytics_snowplow_web_page_1, + struct( + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].basis_for_processing AS basis_for_processing, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_scopes AS consent_scopes, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_url AS consent_url, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].consent_version AS consent_version, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].domains_applied AS domains_applied, + unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].event_type AS event_type, + CAST(unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1[0].gdpr_applies AS BOOLEAN) AS gdpr_applies + ) AS unstruct_event_com_snowplowanalytics_snowplow_consent_preferences_1, + struct(CAST(unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1[0].elapsed_time AS FLOAT) as elapsed_time) as unstruct_event_com_snowplowanalytics_snowplow_cmp_visible_1, + array(struct(contexts_com_iab_snowplow_spiders_and_robots_1[0].category as category, + contexts_com_iab_snowplow_spiders_and_robots_1[0].primaryImpact as primary_impact, + contexts_com_iab_snowplow_spiders_and_robots_1[0].reason as reason, + contexts_com_iab_snowplow_spiders_and_robots_1[0].spiderOrRobot as spider_or_robot)) as contexts_com_iab_snowplow_spiders_and_robots_1, + array(struct(contexts_nl_basjes_yauaa_context_1[0].agentClass as agent_class, + contexts_nl_basjes_yauaa_context_1[0].agentInformationEmail as agent_information_email, + contexts_nl_basjes_yauaa_context_1[0].agentName as agent_name, + contexts_nl_basjes_yauaa_context_1[0].agentNameVersion as agent_name_version, + contexts_nl_basjes_yauaa_context_1[0].agentNameVersionMajor as agent_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].agentVersion as agent_version, + contexts_nl_basjes_yauaa_context_1[0].agentVersionMajor as agent_version_major, + contexts_nl_basjes_yauaa_context_1[0].deviceBrand as device_brand, + contexts_nl_basjes_yauaa_context_1[0].deviceClass as device_class, + contexts_nl_basjes_yauaa_context_1[0].deviceCpu as device_cpu, + contexts_nl_basjes_yauaa_context_1[0].deviceCpuBits as device_cpu_bits, + contexts_nl_basjes_yauaa_context_1[0].deviceName as device_name, + contexts_nl_basjes_yauaa_context_1[0].deviceVersion as device_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineClass as layout_engine_class, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineName as layout_engine_name, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineNameVersion as layout_engine_name_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineNameVersionMajor as layout_engine_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineVersion as layout_engine_version, + contexts_nl_basjes_yauaa_context_1[0].layoutEngineVersionMajor as layout_engine_version_major, + contexts_nl_basjes_yauaa_context_1[0].networkType as network_type, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemClass as operating_system_class, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemName as operating_system_name, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemNameVersion as operating_system_name_version, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemNameVersionMajor as operating_system_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersion as operating_system_version, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersionBuild as operating_system_version_build, + contexts_nl_basjes_yauaa_context_1[0].operatingSystemVersionMajor as operating_system_version_major, + contexts_nl_basjes_yauaa_context_1[0].webviewAppName as webview_app_name, + contexts_nl_basjes_yauaa_context_1[0].webviewAppNameVersionMajor as webview_app_name_version_major, + contexts_nl_basjes_yauaa_context_1[0].webviewAppVersion as webview_app_version, + contexts_nl_basjes_yauaa_context_1[0].webviewAppVersionMajor as webview_app_version_major)) as contexts_nl_basjes_yauaa_context_1, + array(struct(contexts_com_snowplowanalytics_user_identifier_1[0].user_id as user_id)) as contexts_com_snowplowanalytics_user_identifier_1, + array(struct(contexts_com_snowplowanalytics_user_identifier_2[0].user_id as user_id)) as contexts_com_snowplowanalytics_user_identifier_2, + array(struct(contexts_com_snowplowanalytics_session_identifier_1[0].session_id as session_id)) as contexts_com_snowplowanalytics_session_identifier_1, + array(struct(contexts_com_snowplowanalytics_session_identifier_2[0].session_identifier as session_identifier)) as contexts_com_snowplowanalytics_session_identifier_2, + + {% if var("snowplow__custom_test", false) %} + array(struct(contexts_com_snowplowanalytics_custom_entity_1[0].contents as contents)) AS contexts_com_snowplowanalytics_custom_entity_1 + {% else %} + cast(null as array>) as contexts_com_snowplowanalytics_custom_entity_1 + {% endif %} +from + prep \ No newline at end of file diff --git a/integration_tests/models/materializations/default_strategy/test_incremental.sql b/integration_tests/models/materializations/default_strategy/test_incremental.sql index 9b08b3dc..a3372fa1 100644 --- a/integration_tests/models/materializations/default_strategy/test_incremental.sql +++ b/integration_tests/models/materializations/default_strategy/test_incremental.sql @@ -9,22 +9,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 partition_by: BQ only. Key used to limit table scan TODO: Add tests that change the granularity of the partition #} -{{ - config( +{{ config( materialized='incremental', - unique_key=['id', 'id2'], + unique_key=['id','id2'], upsert_date_key='start_tstamp', - partition_by = snowplow_utils.get_value_by_target_type(bigquery_val={ - "field": "start_tstamp", - "data_type": "timestamp" - }), tags=["requires_script"], - snowplow_optimize=true - ) -}} + snowplow_optimize=true, +) }} + with data as ( - select * from {{ ref('data_incremental') }} + select * + from {{ ref('data_incremental') }} {% if target.type == 'snowflake' %} -- data set intentionally contains dupes. -- Snowflake merge will error if dupes occur. Removing for test diff --git a/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql b/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql index c64a50b1..cb52a815 100644 --- a/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql +++ b/integration_tests/models/materializations/default_strategy/test_incremental_w_lookback_disabled.sql @@ -24,9 +24,10 @@ incremental materialization with lookback disabled. ) }} -with data as ( + +WITH data as ( select * from {{ ref('data_incremental') }} - {% if target.type == 'snowflake' %} + {% if target.type in ['snowflake'] %} -- data set intentionally contains dupes. -- Snowflake merge will error if dupes occur. Removing for test where not (run = 1 and id = 2 and start_tstamp = '2021-03-03 00:00:00') diff --git a/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql b/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql index a1f4d568..e56f02f8 100644 --- a/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql +++ b/integration_tests/models/utils/test_snowplow_delete_from_manifest.sql @@ -14,6 +14,5 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 -- Note: Test covers functionality however when running the macro on-run-start hook, transaction behaviour changes. -- Wrapped delete statement in transation so it commits. BQ wouldnt just support 'commit;' without opening trans. Snowflake behaviour untested. -select * - -from {{ ref('data_snowplow_delete_from_manifest_staging') }} +SELECT * +FROM {{ ref('data_snowplow_delete_from_manifest_staging') }} diff --git a/macros/base/base_create_snowplow_events_this_run.sql b/macros/base/base_create_snowplow_events_this_run.sql index 73a51485..6e945abf 100644 --- a/macros/base/base_create_snowplow_events_this_run.sql +++ b/macros/base/base_create_snowplow_events_this_run.sql @@ -255,3 +255,71 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(events_this_run_query) }} {% endmacro %} + + +{% macro spark__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps) %} + {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), + 'start_tstamp', + 'end_tstamp') %} + {% set sessions_this_run = ref(sessions_this_run_table) %} + {% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %} + + {% set events_this_run_query %} + with identified_events AS ( + select + {% if session_sql %} + {{ session_sql }} as session_identifier, + {% else -%} + COALESCE( + {% for identifier in session_identifiers %} + {%- if identifier['schema']|lower != 'atomic' -%} + {{ snowplow_utils.get_field(identifier['schema'], identifier['field'], 'e', dbt.type_string(), 0, snowplow_events) }} + {%- else -%} + e.{{identifier['field']}} + {%- endif -%} + , + {%- endfor -%} + NULL + ) as session_identifier, + {%- endif %} + e.*, + row_number() over (partition by event_id order by {{ session_timestamp }}, dvce_created_tstamp) as event_id_dedupe_index + + from {{ snowplow_events }} e + + ) + + select + a.* + ,b.user_identifier -- take user_identifier from manifest. This ensures only 1 domain_userid per session. + {% if custom_sql %} + , {{ custom_sql }} + {% endif %} + + from identified_events as a + inner join {{ sessions_this_run }} as b + on a.session_identifier = b.session_identifier + + where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }} + {% if allow_null_dvce_tstamps %} + and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }} + {% else %} + and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ session_timestamp }} >= {{ lower_limit }} + and a.{{ session_timestamp }} <= {{ upper_limit }} + and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events + + {% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + + and {{ snowplow_utils.app_id_filter(app_ids) }} + + and a.event_id_dedupe_index = 1 + {% endset %} + + {{ return(events_this_run_query) }} + +{% endmacro %} \ No newline at end of file diff --git a/macros/utils/cross_db/get_field.sql b/macros/utils/cross_db/get_field.sql index 3a2dc8df..94d6d487 100644 --- a/macros/utils/cross_db/get_field.sql +++ b/macros/utils/cross_db/get_field.sql @@ -29,7 +29,13 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% if '*' in column_name %} {% do exceptions.raise_compiler_error('Wildcard schema versions are only supported for Bigquery, they are not supported for ' ~ target.type) %} {% else %} - {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}}{%- if type -%}::{{type}}{%- endif -%} + {%- if type is none -%} + {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}} + {%- else -%} + CAST( + {%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}} AS {{type}} + ) + {%- endif -%} {% endif %} {% endmacro %} diff --git a/macros/utils/cross_db/get_string_agg.sql b/macros/utils/cross_db/get_string_agg.sql index 90384af6..6d3ec71c 100644 --- a/macros/utils/cross_db/get_string_agg.sql +++ b/macros/utils/cross_db/get_string_agg.sql @@ -93,6 +93,36 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} {% macro spark__get_string_agg(base_column, column_prefix, separator=',', order_by_column=base_column, sort_numeric=false, order_by_column_prefix=column_prefix, is_distinct=false, order_desc = false) %} + /* Explaining inside out: + 1. Create a group array which is made of sub-arrays of the base_column and the sort column + 2. Sort these sub-arrays based on a lambda function that compares on the second element (the sort column, casted if needed) + 3. Use transform to select just the first element of the array + 4. Optionally use array_distinct + 5. Join the array into a string + */ + array_join( + {% if is_distinct %} array_distinct( {% endif %} + transform( + array_sort( + FILTER(collect_list( + ARRAY(cast({{column_prefix}}.{{base_column}} as string), cast({{order_by_column_prefix}}.{{order_by_column}} as string))), x -> x[0] is not null), (left, right) -> + + {%- if sort_numeric -%} + CASE WHEN cast(left[1] as decimal(38, 9)) {% if order_desc %} > {% else %} < {% endif %} cast(right[1] as decimal(38, 9)) THEN -1 + WHEN cast(left[1] as decimal(38, 9)) {% if order_desc %} < {% else %} > {% endif %} cast(right[1] as decimal(38, 9)) THEN 1 ELSE 0 END + + {% else %} + CASE WHEN left[1] {% if order_desc %} > {% else %} < {% endif %} right[1] THEN -1 + WHEN left[1] {% if order_desc %} < {% else %} > {% endif %} right[1] THEN 1 ELSE 0 END + + {% endif %} + ), x -> x[0]) + {% if is_distinct %} ) {% endif %}, + '{{separator}}') +{% endmacro %} + + +{% macro databricks__get_string_agg(base_column, column_prefix, separator=',', order_by_column=base_column, sort_numeric=false, order_by_column_prefix=column_prefix, is_distinct=false, order_desc = false) %} /* Explaining inside out: 1. Create a group array which is made of sub-arrays of the base_column and the sort column 2. Sort these sub-arrays based on a lamdba function that compares on the second element (the sort column, casted if needed) diff --git a/macros/utils/cross_db/timestamp_functions.sql b/macros/utils/cross_db/timestamp_functions.sql index e4ef4f03..feb0bdb0 100644 --- a/macros/utils/cross_db/timestamp_functions.sql +++ b/macros/utils/cross_db/timestamp_functions.sql @@ -14,16 +14,36 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(adapter.dispatch('timestamp_diff', 'snowplow_utils')(first_tstamp, second_tstamp, datepart)) }} {% endmacro %} - {% macro default__timestamp_diff(first_tstamp, second_tstamp, datepart) %} {{ return(datediff(first_tstamp, second_tstamp, datepart)) }} {% endmacro %} - {% macro bigquery__timestamp_diff(first_tstamp, second_tstamp, datepart) %} timestamp_diff({{second_tstamp}}, {{first_tstamp}}, {{datepart}}) {% endmacro %} +{% macro databricks__timestamp_diff(first_tstamp, second_tstamp, datepart) %} + {{ return(datediff(first_tstamp, second_tstamp, datepart)) }} +{% endmacro %} + +{% macro spark__timestamp_diff(first_tstamp, second_tstamp, datepart) %} + {% if datepart|lower == 'week' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / (3600 * 24 * 7) as bigint) + {% elif datepart|lower == 'day' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / (3600 * 24) as bigint) + {% elif datepart|lower == 'hour' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / 3600 as bigint) + {% elif datepart|lower == 'minute' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) / 60 as bigint) + {% elif datepart|lower == 'second' %} + cast(unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp)) as bigint) + {% elif datepart|lower == 'millisecond' %} + cast((unix_timestamp(cast({{second_tstamp}} as timestamp)) - unix_timestamp(cast({{first_tstamp}} as timestamp))) * 1000 as bigint) + {% else %} + {{ exceptions.raise_compiler_error("Unsupported datepart for Spark: " ~ datepart) }} + {% endif %} +{% endmacro %} + {% macro timestamp_add(datepart, interval, tstamp) %} {{ return(adapter.dispatch('timestamp_add', 'snowplow_utils')(datepart, interval, tstamp)) }} @@ -44,6 +64,23 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 timestampadd({{datepart}}, {{interval}}, {{tstamp}}) {% endmacro %} +{% macro spark__timestamp_add(datepart, interval, tstamp) %} + {% if datepart|lower == 'week' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(24 as bigint) * cast(7 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'day' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(24 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'hour' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(3600 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'minute' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + (cast({{interval}} as bigint) * cast(60 as bigint) * cast(1000 as bigint)) as bigint)) + {% elif datepart|lower == 'second' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + cast({{interval}} as bigint) * cast(1000 as bigint) as bigint)) + {% elif datepart|lower == 'millisecond' %} + timestamp_millis(cast(cast(unix_millis({{tstamp}}) as bigint) + cast({{interval}} as bigint) as bigint)) + {% else %} + {{ exceptions.raise_compiler_error("Unsupported datepart for Spark: " ~ datepart) }} + {% endif %} +{% endmacro %} {% macro cast_to_tstamp(tstamp_literal) -%} {% if tstamp_literal is none or tstamp_literal|lower in ['null',''] %} diff --git a/macros/utils/get_value_by_target_type.sql b/macros/utils/get_value_by_target_type.sql index 4c301315..6d89b6ae 100644 --- a/macros/utils/get_value_by_target_type.sql +++ b/macros/utils/get_value_by_target_type.sql @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ #} -{%- macro get_value_by_target_type(bigquery_val=none, snowflake_val=none, redshift_val=none, postgres_val=none, databricks_val=none) -%} +{%- macro get_value_by_target_type(bigquery_val=none, snowflake_val=none, redshift_val=none, postgres_val=none, databricks_val=none, spark_val=none) -%} {% if target.type == 'bigquery' %} {{ return(bigquery_val) }} @@ -14,8 +14,10 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ return(redshift_val) }} {% elif target.type == 'postgres' %} {{ return(postgres_val) }} - {% elif target.type in ['databricks', 'spark'] %} + {% elif target.type in ['databricks'] %} {{ return(databricks_val) }} + {% elif target.type in ['spark'] %} + {{ return(spark_val) }} {% else %} {{ exceptions.raise_compiler_error("Snowplow: Unexpected target type "~target.type) }} {% endif %} diff --git a/macros/utils/snowplow_delete_from_manifest.sql b/macros/utils/snowplow_delete_from_manifest.sql index a8f34da7..63b8d60d 100644 --- a/macros/utils/snowplow_delete_from_manifest.sql +++ b/macros/utils/snowplow_delete_from_manifest.sql @@ -45,17 +45,22 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {%- endif -%} {% set delete_statement %} - {%- if target.type in ['databricks', 'spark'] -%} - delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }}); + {%- if target.type in ['databricks'] -%} + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); + {%- elif target.type in ['spark'] -%} + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); {%- else -%} -- We don't need transaction but Redshift needs commit statement while BQ does not. By using transaction we cover both. - begin; - delete from {{ incremental_manifest_table }} where model in ({{ snowplow_utils.print_list(matched_models) }}); - commit; + BEGIN; + DELETE FROM {{ incremental_manifest_table }} + WHERE model IN ({{ snowplow_utils.print_list(matched_models) }}); + COMMIT; {%- endif -%} - {% endset %} - {%- do run_query(delete_statement) -%} + {% endset %} + {%- do adapter.execute(delete_statement) -%} {%- if matched_models|length -%} {% do snowplow_utils.log_message("Snowplow: Deleted models "+snowplow_utils.print_list(matched_models)+" from the manifest") %}