Skip to content

Commit

Permalink
Based on feedback from RC release add improvent for Spark iceberg sup…
Browse files Browse the repository at this point in the history
…port

* Internal CI config changes
* Change the way we define file format
* Fix a bug in post_ci_cleanup internal macro
  • Loading branch information
ilias1111 committed Oct 11, 2024
1 parent c37fef2 commit 306faff
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ jobs:
- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }}
password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ integration_tests:
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
connect_retries: 5
connect_timeout: 60
threads: 1
vars:
snowplow__datalake_file_format: iceberg
threads: 1
8 changes: 1 addition & 7 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,7 @@ models:
snowplow_utils_integration_tests:
+schema: "snplw_utils_int_tests"
+incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}"
+file_format: >
{{
var(
'snowplow__datalake_file_format',
'delta' if target.type != 'spark' else 'iceberg'
)
}}
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
materializations:
snowflake_delete_insert:
enabled: "{{ target.type == 'snowflake' | as_bool() }}"
Expand Down
20 changes: 12 additions & 8 deletions macros/utils/get_schemas_by_pattern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License
and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0.
You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/
#}
{% macro get_schemas_by_pattern(schema_pattern) %}
{% macro get_schemas_by_pattern(schema_pattern=target.schema) %}
{{ return(adapter.dispatch('get_schemas_by_pattern', 'snowplow_utils')
(schema_pattern)) }}
{% endmacro %}
Expand All @@ -19,16 +19,20 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% endmacro %}

{% macro spark__get_schemas_by_pattern(schema_pattern) %}
{# databricks/spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore #}
{%- set schema_pattern= dbt.replace(schema_pattern, "%", "*") -%}
{#
Databricks/Spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore.
Replace '%' with '*' for Spark's pattern matching.
#}
{%- set adjusted_schema_pattern = schema_pattern | replace("%", "*") -%}

{# Get all schemas with the target.schema prefix #}
{%- set get_schemas_sql -%}
SHOW SCHEMAS LIKE '{{schema_pattern}}';
{%- endset -%}
{# Construct the SHOW SCHEMAS LIKE query #}
{%- set get_schemas_sql = "SHOW SCHEMAS LIKE '" ~ adjusted_schema_pattern ~ "'" -%}

{# Execute the query and fetch results #}
{% set results = run_query(get_schemas_sql) %}
{% set schemas = results|map(attribute='databaseName')|unique|list %}

{# Extract schema names from the results #}
{% set schemas = results.columns[0].values() | unique | list %}

{{ return(schemas) }}

Expand Down
87 changes: 86 additions & 1 deletion macros/utils/post_ci_cleanup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,56 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{# Destructive macro. Use with care! #}

{% macro post_ci_cleanup(schema_pattern=target.schema) %}
{{ return(adapter.dispatch('post_ci_cleanup', 'snowplow_utils')(schema_pattern)) }}
{% endmacro %}


{% macro default__post_ci_cleanup(schema_pattern=target.schema) %}

{# Get all schemas with the target.schema prefix #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %}

{% if schemas|length %}

{%- if target.type in ['databricks'] -%}
{# Generate sql to drop all identified schemas #}
{% for schema in schemas -%}
{%- set drop_schema_sql -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{%- endset -%}

{% do run_query(drop_schema_sql) %}

{% endfor %}

{%- else -%}
{# Generate sql to drop all identified schemas #}
{% set drop_schema_sql -%}

{% for schema in schemas -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{% endfor %}

{%- endset %}

{# Drop schemas #}
{% do run_query(drop_schema_sql) %}

{%- endif -%}

{% endif %}

{% endmacro %}


{% macro databricks__post_ci_cleanup(schema_pattern=target.schema) %}

{# Get all schemas with the target.schema prefix #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %}

{% if schemas|length %}

{%- if target.type in ['databricks', 'spark'] -%}
{%- if target.type in ['databricks'] -%}
{# Generate sql to drop all identified schemas #}
{% for schema in schemas -%}
{%- set drop_schema_sql -%}
Expand Down Expand Up @@ -42,3 +85,45 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% endif %}

{% endmacro %}

{#
Spark-specific implementation for post CI cleanup.
#}

{% macro spark__post_ci_cleanup(schema_pattern=target.schema) %}
{# Retrieve all schemas matching the pattern #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern ~ "%") %}

{% if schemas | length > 0 %}
{% for schema in schemas %}
{{ log("Processing schema: " ~ schema, info=True) }}

{# Step 1: List all tables in the current schema #}
{% set tables_query = "SHOW TABLES IN " ~ schema %}
{% set tables_result = run_query(tables_query) %}

{# Initialize an empty list for tables #}
{% set table_list = [] %}

{% if tables_result and tables_result.rows %}
{% for row in tables_result.rows %}
{% set table = row[1] %}
{% do table_list.append(table) %}
{% endfor %}

{# Step 2: Drop each table individually #}
{% for table in table_list %}
{% set drop_table_sql = "DROP TABLE IF EXISTS " ~ schema ~ "." ~ table ~ ";" %}
{% do adapter.execute(drop_table_sql) %}
{% endfor %}
{% else %}
{% endif %}

{# Step 3: Drop the schema #}
{% set drop_schema_sql = "DROP SCHEMA IF EXISTS " ~ schema ~ ";" %}
{% do adapter.execute(drop_schema_sql) %}
{% endfor %}
{% else %}
{{ log("No schemas found matching pattern: " ~ schema_pattern, info=True) }}
{% endif %}
{% endmacro %}

0 comments on commit 306faff

Please sign in to comment.