-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allow different schema for tmp tables created during table materialization #664
base: main
Are you sure you want to change the base?
Changes from 5 commits
5de6d0c
22ccf57
9123190
b453ab2
9a54f77
62e225f
7d9adaa
1962cea
afca00b
79ebb41
5bc3ce3
b05d1df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -36,6 +36,16 @@ | |||||
{%- endcall %} | ||||||
{%- endmacro %} | ||||||
|
||||||
{% macro set_table_relation_schema(relation, schema) %} | ||||||
{%- if temp_schema is not none -%} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or could you explain how do you get temp_schema here ? |
||||||
{%- set relation = relation.incorporate(path={ | ||||||
"schema": schema | ||||||
}) -%} | ||||||
{%- do create_schema(relation) -%} | ||||||
pierrebzl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
{% endif %} | ||||||
{{ return(relation) }} | ||||||
{% endmacro %} | ||||||
|
||||||
{% macro make_temp_relation(base_relation, suffix='__dbt_tmp', temp_schema=none) %} | ||||||
{%- set temp_identifier = base_relation.identifier ~ suffix -%} | ||||||
{%- set temp_relation = base_relation.incorporate(path={"identifier": temp_identifier}) -%} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,10 +7,14 @@ | |
{%- set lf_grants = config.get('lf_grants') -%} | ||
|
||
{%- set table_type = config.get('table_type', default='hive') | lower -%} | ||
{%- set temp_schema = config.get('temp_schema') -%} | ||
nicor88 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} | ||
{%- set old_tmp_relation = adapter.get_relation(identifier=identifier ~ '__ha', | ||
schema=schema, | ||
database=database) -%} | ||
{%- if temp_schema is not none and old_tmp_relation is not none-%} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't fully get this check, why do you need to include a check for old_tmp_relation not being none here? If old_tmp_relation is none, we still want to create the new tmp table in the tmp schema. |
||
{%- set old_tmp_relation = set_table_relation_schema(relation=old_tmp_relation, schema=temp_schema) -%} | ||
{%- endif -%} | ||
{%- set old_bkp_relation = adapter.get_relation(identifier=identifier ~ '__bkp', | ||
schema=schema, | ||
database=database) -%} | ||
|
@@ -31,6 +35,9 @@ | |
database=database, | ||
s3_path_table_part=target_relation.identifier, | ||
type='table') -%} | ||
{%- if temp_schema is not none -%} | ||
{%- set tmp_relation = set_table_relation_schema(relation=tmp_relation, schema=temp_schema) -%} | ||
{%- endif -%} | ||
|
||
{%- if ( | ||
table_type == 'hive' | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import pytest | ||
import yaml | ||
from tests.functional.adapter.utils.parse_dbt_run_output import ( | ||
extract_create_statement_table_names, | ||
extract_rename_statement_table_names, | ||
extract_running_ddl_statements, | ||
) | ||
|
||
from dbt.contracts.results import RunStatus | ||
from dbt.tests.util import run_dbt | ||
|
||
models__iceberg_table = """ | ||
{{ config( | ||
materialized='table', | ||
table_type='iceberg', | ||
temp_schema=var('temp_schema_name'), | ||
) | ||
}} | ||
|
||
select | ||
{{ var('test_id') }} as id | ||
""" | ||
|
||
|
||
class TestTableIcebergTableUnique: | ||
@pytest.fixture(scope="class") | ||
def models(self): | ||
return {"models__iceberg_table.sql": models__iceberg_table} | ||
|
||
def test__temp_schema_name_iceberg_table(self, project, capsys): | ||
relation_name = "models__iceberg_table" | ||
temp_schema_name = f"{project.test_schema}_tmp" | ||
drop_temp_schema = f"drop schema if exists `{temp_schema_name}` cascade" | ||
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" | ||
model_run_result_test_id_query = f"select id from {project.test_schema}.{relation_name}" | ||
|
||
vars_dict = { | ||
"temp_schema_name": temp_schema_name, | ||
"test_id": 1, | ||
} | ||
|
||
model_run = run_dbt( | ||
[ | ||
"run", | ||
"--select", | ||
relation_name, | ||
"--vars", | ||
yaml.safe_dump(vars_dict), | ||
"--log-level", | ||
"debug", | ||
"--log-format", | ||
"json", | ||
] | ||
) | ||
|
||
model_run_result = model_run.results[0] | ||
assert model_run_result.status == RunStatus.Success | ||
|
||
out, _ = capsys.readouterr() | ||
athena_running_create_statements = extract_running_ddl_statements(out, relation_name, "create table") | ||
assert len(athena_running_create_statements) == 1 | ||
|
||
incremental_model_run_result_table_name = extract_create_statement_table_names( | ||
athena_running_create_statements[0] | ||
)[0] | ||
assert temp_schema_name not in incremental_model_run_result_table_name | ||
|
||
model_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] | ||
assert model_records_count == 1 | ||
|
||
model_test_id_in_table = project.run_sql(model_run_result_test_id_query, fetch="all")[0][0] | ||
assert model_test_id_in_table == 1 | ||
|
||
vars_dict["test_id"] = 2 | ||
|
||
model_run = run_dbt( | ||
[ | ||
"run", | ||
"--select", | ||
relation_name, | ||
"--vars", | ||
yaml.safe_dump(vars_dict), | ||
"--log-level", | ||
"debug", | ||
"--log-format", | ||
"json", | ||
] | ||
) | ||
|
||
model_run_result = model_run.results[0] | ||
assert model_run_result.status == RunStatus.Success | ||
|
||
model_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] | ||
assert model_records_count == 1 | ||
|
||
model_test_id_in_table = project.run_sql(model_run_result_test_id_query, fetch="all")[0][0] | ||
assert model_test_id_in_table == 2 | ||
|
||
out, _ = capsys.readouterr() | ||
athena_running_create_statements = extract_running_ddl_statements(out, relation_name, "create table") | ||
assert len(athena_running_create_statements) == 1 | ||
|
||
model_run_2_result_table_name = extract_create_statement_table_names(athena_running_create_statements[0])[0] | ||
assert temp_schema_name in model_run_2_result_table_name | ||
|
||
athena_running_alter_statements = extract_running_ddl_statements(out, relation_name, "alter table") | ||
assert len(athena_running_alter_statements) == 1 | ||
|
||
athena_running_alter_statement_tables = extract_rename_statement_table_names(athena_running_alter_statements[0]) | ||
athena_running_alter_statement_origin_table = athena_running_alter_statement_tables.get("alter_table_names")[0] | ||
athena_running_alter_statement_renamed_to_table = athena_running_alter_statement_tables.get( | ||
"rename_to_table_names" | ||
)[0] | ||
|
||
assert temp_schema_name in athena_running_alter_statement_origin_table | ||
assert athena_running_alter_statement_renamed_to_table == f"`{project.test_schema}`.`{relation_name}`" | ||
|
||
project.run_sql(drop_temp_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add some comments to this method?