Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redshift: Use QUALIFY statement instead of prep-CTEs to enhance performance #302

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions macros/tables/redshift/eff_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,13 @@ source_data AS (
In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
#}
{%- if is_incremental() %}
current_status_prep AS (

SELECT
{{ tracked_hashkey }},
{{ is_active_alias}},
ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
FROM {{ this }}

),

current_status AS (

SELECT
{{ tracked_hashkey }},
{{ is_active_alias }}
FROM current_status_prep
WHERE rn = 1
FROM {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1

),
{% endif %}
Expand Down Expand Up @@ -136,32 +126,19 @@ current_status AS (

{#
The rows are deduplicated on the is_active_alias, to only include status changes.
Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
#}
deduplicated_incoming_prep AS (

SELECT
is_active.{{ tracked_hashkey }},
is_active.{{ src_ldts }},
is_active.{{ is_active_alias }},
LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active

FROM is_active

),

deduplicated_incoming AS (

SELECT
deduplicated_incoming_prep.{{ tracked_hashkey }},
deduplicated_incoming_prep.{{ src_ldts }},
deduplicated_incoming_prep.{{ is_active_alias }}

FROM
deduplicated_incoming_prep
WHERE
deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
OR deduplicated_incoming_prep.lag_is_active IS NULL
ia.{{ tracked_hashkey }},
ia.{{ src_ldts }},
ia.{{ is_active_alias }}
FROM is_active ia
QUALIFY
CASE
WHEN ia.{{ is_active_alias }} = LAG(ia.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END

),

Expand Down
16 changes: 6 additions & 10 deletions macros/tables/redshift/hub.sql
Original file line number Diff line number Diff line change
Expand Up @@ -207,21 +207,17 @@ source_new_union AS (

{%- endif %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

records_to_insert AS (
{#- Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the hub. #}
Expand Down
16 changes: 6 additions & 10 deletions macros/tables/redshift/link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,17 @@ source_new_union AS (

{%- endif %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

records_to_insert AS (
{# Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the link. #}
Expand Down
31 changes: 8 additions & 23 deletions macros/tables/redshift/ma_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,29 @@ source_data AS (

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aaaaha

QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #}
lag_source_data AS (
SELECT
{{ parent_hashkey }},
{{ src_ldts }},
{{ ns.hdiff_alias }},
LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as prev_ns_hdiff_alias
FROM source_data
),

deduped_row_hashdiff AS (

SELECT
{{ parent_hashkey }},
{{ src_ldts }},
{{ ns.hdiff_alias }}
FROM lag_source_data
WHERE {{ ns.hdiff_alias }} != prev_ns_hdiff_alias OR prev_ns_hdiff_alias IS NULL
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{# Dedupe the source data regarding non-delta groups. #}
Expand Down
18 changes: 7 additions & 11 deletions macros/tables/redshift/nh_link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,17 @@ source_new_union AS (

{%- if not source_is_single_batch %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

{%- endif %}

Expand Down
44 changes: 10 additions & 34 deletions macros/tables/redshift/ref_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@ source_data AS (

{# Get the latest record for each parent ref key combination in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
{% endfor %}
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
Expand All @@ -66,16 +54,16 @@ latest_entries_in_sat AS (
{% endfor %}
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each parent ref key combination.
Additionally adding a row number based on that order, if incremental.
#}
deduplicated_numbered_source_prep AS (
deduplicated_numbered_source AS (

SELECT
{% for ref_key in parent_ref_keys %}
Expand All @@ -86,24 +74,12 @@ deduplicated_numbered_source_prep AS (
{% if is_incremental() -%}
, ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn
{%- endif %}
, LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as prev_hashdiff
FROM source_data
),

deduplicated_numbered_source AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
{% endfor %}
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source_prep
WHERE 1=1
AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL
{% if is_incremental() -%}
AND rn = 1
{%- endif %}
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY
CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{#
Expand All @@ -128,7 +104,7 @@ records_to_insert AS (
AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
{% endfor %}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
)
AND deduplicated_numbered_source.rn = 1)
{%- endif %}

)
Expand Down
42 changes: 11 additions & 31 deletions macros/tables/redshift/sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,22 @@ source_data AS (

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each hashkey.
Additionally adding a row number based on that order, if incremental.
#}
deduplicated_numbered_source_prep AS (
deduplicated_numbered_source AS (

SELECT
{{ parent_hashkey }},
Expand All @@ -76,23 +66,12 @@ deduplicated_numbered_source_prep AS (
{% if is_incremental() -%}
, ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as rn
{%- endif %}
, LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) as prev_hashdiff
FROM source_data

),

deduplicated_numbered_source AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source_prep
WHERE 1=1
AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL
{% if is_incremental() -%}
AND rn = 1
{%- endif %}
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY
CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{#
Expand All @@ -111,7 +90,8 @@ records_to_insert AS (
SELECT 1
FROM latest_entries_in_sat
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }})
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND deduplicated_numbered_source.rn = 1)
{%- endif %}

)
Expand Down
Loading