diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 6ba09b9e..28713e8e 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -17,7 +17,7 @@ import simplejson import sqlalchemy as sa from singer_sdk import SQLConnector -from singer_sdk import typing as th +from singer_sdk.connectors.sql import JSONSchemaToSQL from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB, UUID from sqlalchemy.engine import URL from sqlalchemy.engine.url import make_url @@ -30,7 +30,6 @@ TEXT, TIME, TIMESTAMP, - VARCHAR, TypeDecorator, ) from sshtunnel import SSHTunnelForwarder @@ -39,6 +38,22 @@ from singer_sdk.connectors.sql import FullyQualifiedName +class JSONSchemaToPostgres(JSONSchemaToSQL): + """Convert JSON Schema types to Postgres types.""" + + def __init__(self, *, content_encoding: bool = True) -> None: + """Initialize the JSONSchemaToPostgres instance.""" + super().__init__() + self.content_encoding = content_encoding + + def handle_raw_string(self, schema): + """Handle a raw string type.""" + if self.content_encoding and schema.get("contentEncoding") == "base16": + return HexByteString() + + return TEXT() + + class PostgresConnector(SQLConnector): """Sets up SQL Alchemy, and other Postgres related stuff.""" @@ -214,7 +229,50 @@ def clone_table( new_table.create(bind=connection) return new_table - def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ignore[override] + def _handle_array_type(self, jsonschema: dict) -> ARRAY | JSONB: + """Handle array type.""" + items = jsonschema.get("items") + # Case 1: items is a string + if isinstance(items, str): + return ARRAY(self.to_sql_type({"type": items})) + + # Case 2: items are more complex + if isinstance(items, dict): + # Case 2.1: items are variants + if "type" not in items: + return ARRAY(JSONB()) + + items_type = items["type"] + + # Case 2.2: items are a single type + if isinstance(items_type, str): + return ARRAY(self.to_sql_type({"type": items_type})) + + # Case 2.3: items are a list of types + if isinstance(items_type, list): + return ARRAY(self.to_sql_type({"type": items_type})) + + # Case 3: tuples + return ARRAY(JSONB()) if isinstance(items, list) else JSONB() + + @cached_property + def jsonschema_to_sql(self) -> JSONSchemaToSQL: + """Return a JSONSchemaToSQL instance with custom type handling.""" + to_sql = JSONSchemaToPostgres(content_encoding=self.interpret_content_encoding) + to_sql.fallback_type = TEXT + to_sql.register_type_handler("integer", BIGINT) + to_sql.register_type_handler("object", JSONB) + to_sql.register_type_handler("array", self._handle_array_type) + to_sql.register_format_handler("date-time", TIMESTAMP) + to_sql.register_format_handler("uuid", UUID) + to_sql.register_format_handler("email", TEXT) + to_sql.register_format_handler("uri", TEXT) + to_sql.register_format_handler("hostname", TEXT) + to_sql.register_format_handler("ipv4", TEXT) + to_sql.register_format_handler("ipv6", TEXT) + return to_sql + + def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -270,7 +328,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array) - def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 + def pick_individual_type(self, jsonschema_type: dict): """Select the correct sql type assuming jsonschema_type has only a single type. Args: @@ -281,47 +339,8 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911 """ if "null" in jsonschema_type["type"]: return None - if "integer" in jsonschema_type["type"]: - return BIGINT() - if "object" in jsonschema_type["type"]: - return JSONB() - if "array" in jsonschema_type["type"]: - items = jsonschema_type.get("items") - # Case 1: items is a string - if isinstance(items, str): - return ARRAY(self.to_sql_type({"type": items})) - - # Case 2: items are more complex - if isinstance(items, dict): - # Case 2.1: items are variants - if "type" not in items: - return ARRAY(JSONB()) - - items_type = items["type"] - - # Case 2.2: items are a single type - if isinstance(items_type, str): - return ARRAY(self.to_sql_type({"type": items_type})) - - # Case 2.3: items are a list of types - if isinstance(items_type, list): - return ARRAY(self.to_sql_type({"type": items_type})) - - # Case 3: tuples - return ARRAY(JSONB()) if isinstance(items, list) else JSONB() - - # string formats - if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() - if jsonschema_type.get("format") == "uuid": - return UUID() - if ( - self.interpret_content_encoding - and jsonschema_type.get("contentEncoding") == "base16" - ): - return HexByteString() - individual_type = th.to_sql_type(jsonschema_type) - return TEXT() if isinstance(individual_type, VARCHAR) else individual_type + + return self.jsonschema_to_sql.to_sql_type(jsonschema_type) @staticmethod def pick_best_sql_type(sql_type_array: list):