Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Apply non-functional refactoring and fix typos #281

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ select = [
"F", # Pyflakes
"W", # pycodestyle warnings
"E", # pycodestyle errors
"FA", # flake8-future-annotations
"I", # isort
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
"N", # pep8-naming
"D", # pydocsyle
Expand All @@ -97,6 +96,9 @@ select = [
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"ERA", # eradicate
"PGH", # pygrep-hooks
"PL", # Pylint
"PERF", # Perflint
"RUF", # ruff
]
Expand Down
37 changes: 19 additions & 18 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import atexit
import io
import itertools
import signal
import sys
import typing as t
from contextlib import contextmanager
from functools import cached_property
Expand Down Expand Up @@ -95,7 +97,7 @@ def interpret_content_encoding(self) -> bool:
"""
return self.config.get("interpret_content_encoding", False)

def prepare_table( # type: ignore[override]
def prepare_table( # type: ignore[override] # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
schema: dict,
Expand All @@ -121,7 +123,7 @@ def prepare_table( # type: ignore[override]
meta = sa.MetaData(schema=schema_name)
table: sa.Table
if not self.table_exists(full_table_name=full_table_name):
table = self.create_empty_table(
return self.create_empty_table(
table_name=table_name,
meta=meta,
schema=schema,
Expand All @@ -130,7 +132,6 @@ def prepare_table( # type: ignore[override]
as_temp_table=as_temp_table,
connection=connection,
)
return table
meta.reflect(connection, only=[table_name])
table = meta.tables[
full_table_name
Expand Down Expand Up @@ -182,10 +183,12 @@ def copy_table_structure(
raise RuntimeError("Table already exists")

columns = [column._copy() for column in from_table.columns]

if as_temp_table:
new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"])
new_table.create(bind=connection)
return new_table

new_table = sa.Table(table_name, meta, *columns)
new_table.create(bind=connection)
return new_table
Expand All @@ -199,7 +202,7 @@ def drop_table(self, table: sa.Table, connection: sa.engine.Connection):
"""Drop table data."""
table.drop(bind=connection)

def clone_table(
def clone_table( # noqa: PLR0913
self, new_table_name, table, metadata, connection, temp_table
) -> sa.Table:
"""Clone a table."""
Expand Down Expand Up @@ -269,7 +272,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict):
def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
"""Select the correct sql type assuming jsonschema_type has only a single type.

Args:
Expand Down Expand Up @@ -324,9 +327,8 @@ def pick_individual_type(self, jsonschema_type: dict):
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
return individual_type

return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

@staticmethod
def pick_best_sql_type(sql_type_array: list):
Expand Down Expand Up @@ -355,13 +357,12 @@ def pick_best_sql_type(sql_type_array: list):
NOTYPE,
]

for sql_type in precedence_order:
for obj in sql_type_array:
if isinstance(obj, sql_type):
return obj
for sql_type, obj in itertools.product(precedence_order, sql_type_array):
if isinstance(obj, sql_type):
return obj
return TEXT()

def create_empty_table( # type: ignore[override]
def create_empty_table( # type: ignore[override] # noqa: PLR0913
self,
table_name: str,
meta: sa.MetaData,
Expand Down Expand Up @@ -418,7 +419,7 @@ def create_empty_table( # type: ignore[override]
new_table.create(bind=connection)
return new_table

def prepare_column(
def prepare_column( # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
column_name: str,
Expand Down Expand Up @@ -466,7 +467,7 @@ def prepare_column(
column_object=column_object,
)

def _create_empty_column( # type: ignore[override]
def _create_empty_column( # type: ignore[override] # noqa: PLR0913
self,
schema_name: str,
table_name: str,
Expand Down Expand Up @@ -531,7 +532,7 @@ def get_column_add_ddl( # type: ignore[override]
},
)

def _adapt_column_type( # type: ignore[override]
def _adapt_column_type( # type: ignore[override] # noqa: PLR0913
self,
schema_name: str,
table_name: str,
Expand Down Expand Up @@ -669,7 +670,7 @@ def get_sqlalchemy_query(self, config: dict) -> dict:
# ssl_enable is for verifying the server's identity to the client.
if config["ssl_enable"]:
ssl_mode = config["ssl_mode"]
query.update({"sslmode": ssl_mode})
query["sslmode"] = ssl_mode
query["sslrootcert"] = self.filepath_or_certificate(
value=config["ssl_certificate_authority"],
alternative_name=config["ssl_storage_directory"] + "/root.crt",
Expand Down Expand Up @@ -764,7 +765,7 @@ def catch_signal(self, signum, frame) -> None:
signum: The signal number
frame: The current stack frame
"""
exit(1) # Calling this to be sure atexit is called, so clean_up gets called
sys.exit(1) # Calling this to be sure atexit is called, so clean_up gets called

def _get_column_type( # type: ignore[override]
self,
Expand Down
35 changes: 17 additions & 18 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ def process_batch(self, context: dict) -> None:

def generate_temp_table_name(self):
"""Uuid temp table name."""
# sa.exc.IdentifierError: Identifier
# sa.exc.IdentifierError: Identifier # noqa: ERA001
# 'temp_test_optional_attributes_388470e9_fbd0_47b7_a52f_d32a2ee3f5f6'
# exceeds maximum length of 63 characters
# Is hit if we have a long table name, there is no limit on Temporary tables
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def bulk_insert_records( # type: ignore[override]
def bulk_insert_records( # type: ignore[override] # noqa: PLR0913
self,
table: sa.Table,
schema: dict,
Expand Down Expand Up @@ -159,24 +159,24 @@ def bulk_insert_records( # type: ignore[override]
if self.append_only is False:
insert_records: dict[str, dict] = {} # pk : record
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_value = "".join([str(record[key]) for key in primary_keys])
insert_records[primary_key_value] = insert_record
data_to_insert = list(insert_records.values())
else:
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
return True

def upsert(
def upsert( # noqa: PLR0913
self,
from_table: sa.Table,
to_table: sa.Table,
Expand Down Expand Up @@ -252,14 +252,13 @@ def column_representation(
schema: dict,
) -> list[sa.Column]:
"""Return a sqlalchemy table representation for the current schema."""
columns: list[sa.Column] = []
for property_name, property_jsonschema in schema["properties"].items():
columns.append(
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
columns: list[sa.Column] = [
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
)
for property_name, property_jsonschema in schema["properties"].items()
]
return columns

def generate_insert_statement(
Expand Down Expand Up @@ -289,12 +288,12 @@ def schema_name(self) -> str | None:
"""Return the schema name or `None` if using names with no schema part.

Note that after the next SDK release (after 0.14.0) we can remove this
as it's already upstreamed.
as it's already up-streamed.

Returns:
The target schema name.
"""
# Look for a default_target_scheme in the configuration fle
# Look for a default_target_scheme in the configuration file
default_target_schema: str = self.config.get("default_target_schema", None)
parts = self.stream_name.split("-")

Expand Down
2 changes: 1 addition & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def __init__(
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
"When activate version is sent from a tap this specifies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column. This config "
+ "option is ignored if `activate_version` is set to false."
Expand Down
87 changes: 38 additions & 49 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ def singer_file_to_target(file_name, target) -> None:


def remove_metadata_columns(row: dict) -> dict:
new_row = {}
for column in row.keys():
if not column.startswith("_sdc"):
new_row[column] = row[column]
return new_row
return {column: row[column] for column in row if not column.startswith("_sdc")}


def verify_data(
Expand All @@ -106,43 +102,43 @@ def verify_data(
engine = create_engine(target)
full_table_name = f"{target.config['default_target_schema']}.{table_name}"
with engine.connect() as connection:
if primary_key is not None and check_data is not None:
if isinstance(check_data, dict):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == check_data
elif isinstance(check_data, list):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = [
remove_metadata_columns(row._asdict()) for row in result.all()
]

# bytea columns are returned as memoryview objects
# we need to convert them to bytes to allow comparison with check_data
for row in result_dict:
for col in row:
if isinstance(row[col], memoryview):
row[col] = bytes(row[col])

assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")
else:
if primary_key is None or check_data is None:
result = connection.execute(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows

elif isinstance(check_data, dict):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == check_data
elif isinstance(check_data, list):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = [
remove_metadata_columns(row) for row in result.mappings().all()
]

# bytea columns are returned as memoryview objects
# we need to convert them to bytes to allow comparison with check_data
for row in result_dict:
for col in row:
if isinstance(row[col], memoryview):
row[col] = bytes(row[col])

assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")


def test_sqlalchemy_url_config(postgres_config_no_ssl):
"""Be sure that passing a sqlalchemy_url works
Expand Down Expand Up @@ -439,7 +435,7 @@ def test_encoded_string_data(postgres_target):
https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character.
chr(0) is disallowed because text data types cannot store that character.

Note you will recieve a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error.
Note you will receive a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error.
See issue https://github.com/MeltanoLabs/target-postgres/issues/60 for more details.
"""

Expand Down Expand Up @@ -499,18 +495,11 @@ def test_anyof(postgres_target):

# Any of nullable array of strings or single string.
# {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]}
if column.name == "parent_ids":
assert isinstance(column.type, ARRAY)

# Any of nullable string.
# {"anyOf":[{"type":"string"},{"type":"null"}]}
if column.name == "commit_message":
if column.name in ["commit_message", "legacy_id"]:
assert isinstance(column.type, TEXT)

# Any of nullable string or integer.
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, TEXT)
elif column.name == "parent_ids":
assert isinstance(column.type, ARRAY)


def test_new_array_column(postgres_target):
Expand Down Expand Up @@ -747,7 +736,7 @@ def test_activate_version_deletes_data_properly(postgres_target):
def test_reserved_keywords(postgres_target):
"""Target should work regardless of column names

Postgres has a number of resereved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
Postgres has a number of reserved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
"""
file_name = "reserved_keywords.singer"
singer_file_to_target(file_name, postgres_target)
Expand Down
Loading