Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

287 feature extract multiple columns from one prejoined object #297

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
98d001d
add macro to process new prejoin list syntax
tkiehn Nov 19, 2024
4b0e02d
add process_prejoined_columns macro to top-level stage macro
tkiehn Nov 19, 2024
07ec2de
change prejoin-logic to perform less joins
tkiehn Nov 20, 2024
fa4087e
add check and compilation error if a prejoined column is defined twice
tkiehn Nov 27, 2024
54b8720
add amount of extract_columns and aliases to amount-mismatch compilat…
tkiehn Nov 27, 2024
582af24
Merge branch 'main' into 287-feature-extract-multiple-columns-from-on…
tkiehn Nov 27, 2024
60b2a98
add prejoin with source to processing-macro
tkiehn Nov 27, 2024
3fed963
move stage_processing_macros.sql into staging folder
tkiehn Dec 9, 2024
edf3dc4
change extract_input_columns, process_prejoined_columns. add extract_…
tkiehn Dec 11, 2024
da7d000
add staging.yml with descriptions of process_prejoined_columns and ex…
tkiehn Dec 11, 2024
a970a0a
postgres: modify stage to handle new prejoin syntax and simplify sett…
tkiehn Dec 11, 2024
f8767ae
bigquery: stage: implement new prejoin syntax
tkiehn Dec 11, 2024
90dc5c8
databricks: stage: implement new prejoin syntax
tkiehn Dec 11, 2024
706b2af
exasol stage: implement new prejoin syntax
tkiehn Dec 11, 2024
20d012c
fabric stage: implement new prejoin syntax
tkiehn Dec 11, 2024
36617f3
oracle stage: implement new prejoin syntax
tkiehn Dec 11, 2024
7adba69
postgres stage add prepend_generated_by()
tkiehn Dec 11, 2024
3070e0a
redshift stage: implement new prejoin syntax
tkiehn Dec 11, 2024
ef8b375
snowflake stage: implement new prejoin syntax
tkiehn Dec 11, 2024
e891046
synapse stage: implement new prejoin syntax
tkiehn Dec 11, 2024
71bfee3
synapse stage: remove column name escaping in ghost record macro call
tkiehn Dec 12, 2024
e683e45
fabric stage fix escape column names
tkiehn Dec 12, 2024
3a86469
synapse, fabric stages: fix derived input columns
tkiehn Dec 12, 2024
57e21da
oracle stage: include col_size to ghost records
tkiehn Dec 12, 2024
c8df4d3
synapse stage: fix prejoin_column_names
tkiehn Dec 12, 2024
75189a2
Update oracle stage.sql, remove AS for join alias
tkiehn Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 98 additions & 79 deletions macros/staging/bigquery/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
{# Getting the column names for all additional columns #}
{%- set derived_column_names = datavault4dbt.extract_column_names(derived_columns) -%}
{%- set hashed_column_names = datavault4dbt.extract_column_names(hashed_columns) -%}
{%- set prejoined_column_names = datavault4dbt.extract_column_names(prejoined_columns) -%}
{%- set prejoined_column_names = datavault4dbt.extract_prejoin_column_names(prejoined_columns) -%}
{%- set missing_column_names = datavault4dbt.extract_column_names(missing_columns) -%}
{%- set exclude_column_names = hashed_column_names + prejoined_column_names + missing_column_names + ldts_rsrc_input_column_names %}
{%- set source_and_derived_column_names = (all_source_columns + derived_column_names) | unique | list -%}
Expand Down Expand Up @@ -183,6 +183,8 @@
{# Setting the ldts default datatype #}
{% set ldts_default_dtype = datavault4dbt.timestamp_default_dtype() %}

{{ datavault4dbt.prepend_generated_by() }}

WITH

{# Selecting everything that we need from the source relation. #}
Expand Down Expand Up @@ -256,26 +258,51 @@ missing_columns AS (
),
{%- endif -%}


{%- if datavault4dbt.is_something(prejoined_columns) %}
{# Prejoining Business Keys of other source objects for Link purposes #}
prejoined_columns AS (

SELECT
{% if final_columns_to_select | length > 0 -%}
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
{% endif %}
{%- for col, vals in prejoined_columns.items() -%}
,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}
{%- endif -%}

{# Iterate over each prejoin, doing logic checks and generating the select-statements #}
{%- for prejoin in prejoined_columns -%}
{%- set prejoin_alias = 'pj_' + loop.index|string -%}

{# If extract_columns and/or aliases are passed as string convert them to a list so they can be used as iterators later #}
{%- if not datavault4dbt.is_list(prejoin['extract_columns'])-%}
{%- do prejoin.update({'extract_columns': [prejoin['extract_columns']]}) -%}
{%- endif -%}
{%- if not datavault4dbt.is_list(prejoin['aliases']) and datavault4dbt.is_something(prejoin['aliases']) -%}
{%- do prejoin.update({'aliases': [prejoin['aliases']]}) -%}
{%- endif -%}

{# If passed, make sure there are as many aliases as there are extract_columns, ensuring a 1:1 mapping #}
{%- if datavault4dbt.is_something(prejoin['aliases']) -%}
{%- if not prejoin['aliases']|length == prejoin['extract_columns']|length -%}
{%- do exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns. Got "
~ prejoin['extract_columns']|length ~ " extract_column(s) and " ~ prejoin['aliases']|length ~ " aliase(s).") -%}
{%- endif -%}
{%- endif -%}

{# Generate the columns for the SELECT-statement #}
{%- for column in prejoin['extract_columns'] %}
,{{ prejoin_alias }}.{{ column }} {% if datavault4dbt.is_something(prejoin['aliases']) -%} AS {{ prejoin['aliases'][loop.index0] }} {% endif -%}
{%- endfor -%}
{%- endfor %}

FROM {{ last_cte }} lcte

{% for col, vals in prejoined_columns.items() %}
{# Iterate over prejoins and generate the join-statements #}
{%- for prejoin in prejoined_columns -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
{%- elif 'ref_model' in vals.keys() -%}
{%- set relation = ref(vals['ref_model']) -%}
{%- if 'ref_model' in prejoin.keys() -%}
{% set relation = ref(prejoin['ref_model']) -%}
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
{%- else -%}
{%- set error_message -%}
Prejoin error: Invalid target entity definition. Allowed are:
Expand All @@ -296,28 +323,25 @@ prejoined_columns AS (
ref_column_name: join_columns_in_ref_model

Got:
{{ col }}: {{ vals }}
{{ prejoin }}
{%- endset -%}

{%- do exceptions.raise_compiler_error(error_message) -%}
{%- endif -%}

{# This sets a default value for the operator that connects multiple joining conditions. Only when it is not set by user. #}
{%- if 'operator' not in vals.keys() -%}
{%- if 'operator' not in prejoin.keys() -%}
{%- set operator = 'AND' -%}
{%- else -%}
{%- set operator = vals['operator'] -%}
{%- set operator = prejoin['operator'] -%}
{%- endif -%}

{%- set prejoin_alias = 'pj_' + loop.index|string -%}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}

{% endfor %}
{%- set prejoin_alias = 'pj_' + loop.index|string %}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=prejoin['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=prejoin['ref_column_name']) }}
{%- endfor -%}

{% set last_cte = "prejoined_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
),
{%- endif -%}

Expand Down Expand Up @@ -444,65 +468,61 @@ unknown_values AS (

SELECT

{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ load_datetime_col_name }},
'{{ unknown_value_rsrc }}' as {{ record_source_col_name }}
{{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ load_datetime_col_name }}
,'{{ unknown_value_rsrc }}' as {{ record_source_col_name }}

{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}
{# Generating Ghost Records for all source columns, except the ldts, rsrc & edwSequence column #}
{%- for column in columns_without_excluded_columns %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }}
{%- if not loop.last %},{% endif -%}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }}
{%- endfor -%}

{%- endif -%}

{%- if datavault4dbt.is_something(missing_columns) -%},
{%- if datavault4dbt.is_something(missing_columns) -%}
{# Additionally generating ghost record for missing columns #}
{%- for col, dtype in missing_columns.items() %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='unknown') }}
{%- if not loop.last %},{% endif -%}
,{{- datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='unknown') }}
{%- endfor -%}
{%- endif -%}

{%- if datavault4dbt.is_something(prejoined_columns) -%},
{# Additionally generating ghost records for the prejoined attributes#}
{% for col, vals in prejoined_columns.items() %}
{%- if datavault4dbt.is_something(prejoined_columns) -%}
{# Additionally generating ghost records for the prejoined attributes #}
{%- for prejoin in prejoined_columns -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
{%- elif 'ref_model' in vals.keys() -%}
{%- set relation = ref(vals['ref_model']) -%}
{%- if 'ref_model' in prejoin.keys() -%}
{%- set relation = ref(prejoin['ref_model']) -%}
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
{%- endif -%}

{%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%}
{{ log('pj_relation_columns: ' ~ pj_relation_columns, false ) }}

{% for column in pj_relation_columns -%}

{% if column.name|lower == vals['bk']|lower -%}
{{ log('column found? yes, for column :' ~ column.name , false) }}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=col) }}
{{ log('pj_relation_columns for '~relation~': ' ~ pj_relation_columns, false ) }}

{%- for column in pj_relation_columns -%}
{%- if column.name|lower in prejoin['extract_columns']|map('lower') -%}
{%- set prejoin_extract_cols_lower = prejoin['extract_columns']|map('lower')|list -%}
{%- set prejoin_col_index = prejoin_extract_cols_lower.index(column.name|lower) -%}
{{ log('column found? yes, for column: ' ~ column.name , false) }}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=prejoin['aliases'][prejoin_col_index]) }}
{%- endif -%}

{%- endfor -%}
{%- if not loop.last %},{% endif %}
{% endfor -%}
{%- endif %}

{%- if datavault4dbt.is_something(derived_columns) -%},
{# Additionally generating Ghost Records for Derived Columns #}
{%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='unknown') }}
{%- if not loop.last %},{% endif -%}
{%- if datavault4dbt.is_something(derived_columns) -%}
{# Additionally generating Ghost Records for Derived Columns #}
{% for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='unknown') }}
{%- endfor -%}

{%- endif -%}

{%- if datavault4dbt.is_something(processed_hash_columns) -%},
{%- if datavault4dbt.is_something(processed_hash_columns) -%}

{%- for hash_column in processed_hash_columns %}
CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }}) as {{ hash_column }}
{%- if not loop.last %},{% endif %}
,CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }}) as {{ hash_column }}
{%- endfor -%}

{%- endif -%}
Expand All @@ -514,62 +534,61 @@ error_values AS (

SELECT

{{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} as {{ load_datetime_col_name }},
'{{ error_value_rsrc }}' as {{ record_source_col_name }}
{{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} as {{ load_datetime_col_name }}
,'{{ error_value_rsrc }}' as {{ record_source_col_name }}

{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%},
{%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}
{# Generating Ghost Records for Source Columns #}
{%- for column in columns_without_excluded_columns %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }}
{%- if not loop.last %},{% endif -%}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }}
{%- endfor -%}

{%- endif -%}

{%- if datavault4dbt.is_something(missing_columns) -%},
{%- if datavault4dbt.is_something(missing_columns) -%}
{# Additionally generating ghost record for Missing columns #}
{%- for col, dtype in missing_columns.items() %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='error') }}
{%- if not loop.last %},{% endif -%}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='error') }}
{%- endfor -%}
{%- endif -%}

{%- if datavault4dbt.is_something(prejoined_columns) -%},
{# Additionally generating ghost records for the prejoined attributes #}
{%- for col, vals in prejoined_columns.items() %}
{%- if datavault4dbt.is_something(prejoined_columns) -%}
{# Additionally generating ghost records for the prejoined attributes#}
{% for prejoin in prejoined_columns %}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
{%- elif 'ref_model' in vals.keys() -%}
{%- set relation = ref(vals['ref_model']) -%}
{%- if 'ref_model' in prejoin.keys() -%}
{% set relation = ref(prejoin['ref_model']) -%}
{%- elif 'src_name' in prejoin.keys() and 'src_table' in prejoin.keys() -%}
{%- set relation = source(prejoin['src_name']|string, prejoin['src_table']) -%}
{%- endif -%}

{%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%}
{{- log('pj_relation_columns for '~relation~': ' ~ pj_relation_columns, false ) -}}

{% for column in pj_relation_columns -%}
{% if column.name|lower == vals['bk']|lower -%}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=col) -}}
{%- if column.name|lower in prejoin['extract_columns']|map('lower') -%}
{%- set prejoin_extract_cols_lower = prejoin['extract_columns']|map('lower')|list -%}
{%- set prejoin_col_index = prejoin_extract_cols_lower.index(column.name|lower) -%}
{{ log('column found? yes, for column: ' ~ column.name , false) }}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=prejoin['aliases'][prejoin_col_index]) }}
{%- endif -%}

{%- endfor -%}
{%- if not loop.last -%},{%- endif %}
{% endfor -%}
{%- endif %}

{%- endif -%}

{%- if datavault4dbt.is_something(derived_columns) %},
{%- if datavault4dbt.is_something(derived_columns) %}
{# Additionally generating Ghost Records for Derived Columns #}
{%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %}
{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='error') }}
{%- if not loop.last %},{% endif %}
,{{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='error') }}
{%- endfor -%}

{%- endif -%}

{%- if datavault4dbt.is_something(processed_hash_columns) -%},
{%- if datavault4dbt.is_something(processed_hash_columns) -%}

{%- for hash_column in processed_hash_columns %}
CAST({{ datavault4dbt.as_constant(column_str=error_key) }} as {{ hash_dtype }}) as {{ hash_column }}
{%- if not loop.last %},{% endif %}
,CAST({{ datavault4dbt.as_constant(column_str=error_key) }} as {{ hash_dtype }}) as {{ hash_column }}
{%- endfor -%}

{%- endif -%}
Expand Down
Loading