From 68b9a1ba51c35c63942df0734e960e2d572218a0 Mon Sep 17 00:00:00 2001
From: joocer <justin.joyce@joocer.com>
Date: Wed, 15 Nov 2023 23:16:36 +0000
Subject: [PATCH 1/3] #1261

---
 opteryx/connectors/base/base_connector.py     |  2 +-
 opteryx/connectors/disk_connector.py          |  6 ++-
 opteryx/connectors/sql_connector.py           | 25 +++++++-----
 opteryx/utils/file_decoders.py                | 39 +++++++++----------
 ...py => test_projection_pushdown_parquet.py} | 16 ++++----
 .../storage/test_collection_gcs_firestore.py  |  3 +-
 tests/storage/test_sql_biq_query.py           |  7 ++++
 tests/storage/test_sql_cockroach.py           |  7 ++++
 tests/storage/test_sql_duckdb.py              |  7 ++++
 tests/storage/test_sql_mysql.py               |  7 ++++
 tests/storage/test_sql_sqlite.py              |  7 ++++
 11 files changed, 84 insertions(+), 42 deletions(-)
 rename tests/plan_optimization/{# test_projection_pushdown_parquet.py => test_projection_pushdown_parquet.py} (72%)

diff --git a/opteryx/connectors/base/base_connector.py b/opteryx/connectors/base/base_connector.py
index feb8ba934..a8a9c3e7f 100644
--- a/opteryx/connectors/base/base_connector.py
+++ b/opteryx/connectors/base/base_connector.py
@@ -97,7 +97,7 @@ def chunk_dictset(
             _id = record.pop("_id", None)
             # column selection
             if columns:
-                record = {k: record.get(k) for k in columns}
+                record = {k.name: record.get(k.name) for k in columns}
             record["id"] = None if _id is None else str(_id)
 
             chunk.append(record)
diff --git a/opteryx/connectors/disk_connector.py b/opteryx/connectors/disk_connector.py
index 6c056e036..31d037caf 100644
--- a/opteryx/connectors/disk_connector.py
+++ b/opteryx/connectors/disk_connector.py
@@ -29,6 +29,7 @@
 from opteryx.exceptions import DatasetNotFoundError
 from opteryx.exceptions import EmptyDatasetError
 from opteryx.exceptions import UnsupportedFileTypeError
+from opteryx.utils.arrow import post_read_projector
 from opteryx.utils.file_decoders import VALID_EXTENSIONS
 from opteryx.utils.file_decoders import get_decoder
 
@@ -102,7 +103,10 @@ def read_dataset(self, columns: list = None) -> pyarrow.Table:
         )
 
         if self.cached_first_blob is not None:
-            yield self.cached_first_blob
+            if columns:
+                yield post_read_projector(self.cached_first_blob, columns)
+            else:
+                yield self.cached_first_blob
             blob_names = blob_names[1:]
         self.cached_first_blob = None
 
diff --git a/opteryx/connectors/sql_connector.py b/opteryx/connectors/sql_connector.py
index 6d932e9a9..81c530fe5 100644
--- a/opteryx/connectors/sql_connector.py
+++ b/opteryx/connectors/sql_connector.py
@@ -25,6 +25,7 @@
 from opteryx.connectors.base.base_connector import BaseConnector
 from opteryx.exceptions import MissingDependencyError
 from opteryx.exceptions import UnmetRequirementError
+from opteryx.third_party.query_builder import Query
 
 
 class SqlConnector(BaseConnector):
@@ -55,19 +56,23 @@ def __init__(self, *args, connection: str = None, engine=None, **kwargs):
     def read_dataset(
         self, columns: list = None, chunk_size: int = INITIAL_CHUNK_SIZE
     ) -> "DatasetReader":
-        from sqlalchemy import Table
-        from sqlalchemy import select
-
         self.chunk_size = chunk_size
+        result_schema = self.schema
 
-        # get the schema from the dataset
-        table = Table(self.dataset, self.metadata, autoload_with=self._engine)
-        print("SQL push projection")
-        query = select(table)
-        morsel = DataFrame(schema=self.schema)
+        query_builder = Query().FROM(self.dataset)
+
+        # if we're projecting, update the SQL and the target morsel schema
+        if columns:
+            column_names = [col.name for col in columns]
+            query_builder.add("SELECT", *column_names)
+            result_schema.columns = [col for col in self.schema.columns if col.name in column_names]
+        else:
+            query_builder.add("SELECT", "*")
+
+        morsel = DataFrame(schema=result_schema)
 
         with self._engine.connect() as conn:
-            for row in conn.execute(query):
+            for row in conn.execute(str(query_builder)):
                 morsel._rows.append(row)
                 if len(morsel) == self.chunk_size:
                     yield morsel.arrow()
@@ -75,7 +80,7 @@ def read_dataset(
                     if morsel.nbytes > 0:
                         self.chunk_size = int(len(morsel) // (morsel.nbytes / DEFAULT_MORSEL_SIZE))
 
-                    morsel = DataFrame(schema=self.schema)
+                    morsel = DataFrame(schema=result_schema)
 
         if len(morsel) > 0:
             yield morsel.arrow()
diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py
index 2946709ef..8bda41701 100644
--- a/opteryx/utils/file_decoders.py
+++ b/opteryx/utils/file_decoders.py
@@ -24,6 +24,8 @@
 from opteryx.exceptions import UnsupportedFileTypeError
 from opteryx.utils.arrow import post_read_projector
 
+memory_pool = pyarrow.default_memory_pool()
+
 
 class ExtentionType(str, Enum):
     """labels for the file extentions"""
@@ -87,46 +89,43 @@ def zstd_decoder(buffer, projection: List = None, selection=None, just_schema: b
 
 def parquet_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
     """
-    Read parquet formatted files
+    Read parquet formatted files.
     """
     from pyarrow import parquet
 
     from opteryx.connectors.capabilities import predicate_pushable
 
-    # parquet uses DNF filters
-    _select = None
-    if selection is not None:
-        _select = predicate_pushable.to_dnf(selection)
+    # Convert the selection to DNF format if applicable
+    _select = predicate_pushable.to_dnf(selection) if selection is not None else None
 
     stream = io.BytesIO(buffer)
 
-    selected_columns = None
-    if isinstance(projection, list) and "*" not in projection or just_schema:
-        # if we have a pushed down projection, get the list of columns from the file
-        # and then only set the reader to read those
+    if projection or just_schema:
+        # Open the parquet file only once
         parquet_file = parquet.ParquetFile(stream)
-        # .schema_arrow appears to be slower than .schema but there are instances of
-        # .schema being incomplete [bug #468] so we pay the extra time for increase reliability
-        arrow_schema = parquet_file.schema_arrow
-        schema_columns = arrow_schema.names
 
+        # Return just the schema if that's all that's needed
         if just_schema:
-            return convert_arrow_schema_to_orso_schema(arrow_schema)
+            return convert_arrow_schema_to_orso_schema(parquet_file.schema_arrow)
 
+        # Special handling for projection of ["count_*"]
         if projection == ["count_*"]:
             return pyarrow.Table.from_pydict(
                 {"_": numpy.full(parquet_file.metadata.num_rows, True, dtype=numpy.bool_)}
             )
 
-        # work out the selected columns, handling aliases -
+        # Projection processing
+        schema_columns_set = set(parquet_file.schema_arrow.names)
         projection_names = {name for proj_col in projection for name in proj_col.all_names}
-        selected_columns = [
-            schema_col for schema_col in schema_columns if schema_col in projection_names
-        ]
+        selected_columns = list(schema_columns_set.intersection(projection_names))
 
-        if len(selected_columns) == 0:  # pragma: no-cover
+        # If no columns are selected, set to None
+        if not selected_columns:
             selected_columns = None
-    # don't prebuffer - we're already buffered as an IO Stream
+    else:
+        selected_columns = None
+
+    # Read the parquet table with the optimized column list and selection filters
     return parquet.read_table(stream, columns=selected_columns, pre_buffer=False, filters=_select)
 
 
diff --git a/tests/plan_optimization/# test_projection_pushdown_parquet.py b/tests/plan_optimization/test_projection_pushdown_parquet.py
similarity index 72%
rename from tests/plan_optimization/# test_projection_pushdown_parquet.py
rename to tests/plan_optimization/test_projection_pushdown_parquet.py
index 9ff7983f9..d875cd733 100644
--- a/tests/plan_optimization/# test_projection_pushdown_parquet.py	
+++ b/tests/plan_optimization/test_projection_pushdown_parquet.py
@@ -16,7 +16,7 @@ def test_parquet_projection_pushdown():
     # with pushdown
     conn = opteryx.connect()
     cur = conn.cursor()
-    cur.execute(f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION);")
+    cur.execute(f"SELECT following FROM testdata.flat.formats.parquet WITH(NO_PARTITION);")
     cur.arrow()
     assert cur.stats["columns_read"] == 1, cur.stats
 
@@ -29,13 +29,13 @@ def test_parquet_projection_pushdown():
     assert cur.stats["columns_read"] == 2
 
     # with pushdown disabled
-    conn = opteryx.connect()
-    cur = conn.cursor()
-    cur.execute(
-        f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION, NO_PUSH_PROJECTION);"
-    )
-    [a for a in cur.fetchall()]
-    assert cur.stats["columns_read"] == 13
+    #    conn = opteryx.connect()
+    #    cur = conn.cursor()
+    #    cur.execute(
+    #        f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION, NO_PUSH_PROJECTION);"
+    #    )
+    #    [a for a in cur.fetchall()]
+    #    assert cur.stats["columns_read"] == 13
 
     # without pushdown
     conn = opteryx.connect()
diff --git a/tests/storage/test_collection_gcs_firestore.py b/tests/storage/test_collection_gcs_firestore.py
index 10d6fd136..7e420e300 100644
--- a/tests/storage/test_collection_gcs_firestore.py
+++ b/tests/storage/test_collection_gcs_firestore.py
@@ -26,8 +26,7 @@ def test_firestore_storage():
     cur.execute("SELECT actor, COUNT(*) FROM dwarves GROUP BY actor;")
     assert cur.rowcount == 6, cur.rowcount
 
-    print("Predicate PushDown Skipped")
-    #    # TEST PREDICATE PUSHDOWN
+    # TEST PREDICATE PUSHDOWN
     #    cur = conn.cursor()
     #    cur.execute("SELECT * FROM dwarves WHERE actor = 'Pinto Colvig';")
     #    # when pushdown is enabled, we only read the matching rows from the source
diff --git a/tests/storage/test_sql_biq_query.py b/tests/storage/test_sql_biq_query.py
index 541c4918e..071bdf401 100644
--- a/tests/storage/test_sql_biq_query.py
+++ b/tests/storage/test_sql_biq_query.py
@@ -24,10 +24,17 @@ def test_bigquery_storage():
 
     results = opteryx.query("SELECT * FROM bq.public.planets")
     assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 20
 
     # PROCESS THE DATA IN SOME WAY
     results = opteryx.query("SELECT COUNT(*) FROM bq.public.planets;")
     assert results.rowcount == 1, results.rowcount
+    assert results.columncount == 1
+
+    # PUSH A PROJECTION
+    results = opteryx.query("SELECT name FROM bq.public.planets;")
+    assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 1
 
 
 if __name__ == "__main__":  # pragma: no cover
diff --git a/tests/storage/test_sql_cockroach.py b/tests/storage/test_sql_cockroach.py
index 97b400c03..e774b15aa 100644
--- a/tests/storage/test_sql_cockroach.py
+++ b/tests/storage/test_sql_cockroach.py
@@ -27,10 +27,17 @@ def test_cockroach_storage():
 
     results = opteryx.query("SELECT * FROM cockroach.planets")
     assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 20
 
     # PROCESS THE DATA IN SOME WAY
     results = opteryx.query("SELECT COUNT(*) FROM cockroach.planets;")
     assert results.rowcount == 1, results.rowcount
+    assert results.columncount == 1
+
+    # PUSH A PROJECTION
+    results = opteryx.query("SELECT name FROM cockroach.planets;")
+    assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 1
 
 
 if __name__ == "__main__":  # pragma: no cover
diff --git a/tests/storage/test_sql_duckdb.py b/tests/storage/test_sql_duckdb.py
index ab6e4da0f..a4b3b11cd 100644
--- a/tests/storage/test_sql_duckdb.py
+++ b/tests/storage/test_sql_duckdb.py
@@ -23,10 +23,17 @@ def test_duckdb_storage():
 
     results = opteryx.query("SELECT * FROM duckdb.planets")
     assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 20
 
     # PROCESS THE DATA IN SOME WAY
     results = opteryx.query("SELECT COUNT(*) FROM duckdb.planets;")
     assert results.rowcount == 1, results.rowcount
+    assert results.columncount == 1
+
+    # PUSH A PROJECTION
+    results = opteryx.query("SELECT name FROM duckdb.planets;")
+    assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 1
 
 
 if __name__ == "__main__":  # pragma: no cover
diff --git a/tests/storage/test_sql_mysql.py b/tests/storage/test_sql_mysql.py
index 9bd96a3e6..a9a168a5d 100644
--- a/tests/storage/test_sql_mysql.py
+++ b/tests/storage/test_sql_mysql.py
@@ -23,10 +23,17 @@ def test_mysql_storage():
 
     results = opteryx.query("SELECT * FROM mysql.planets")
     assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 20
 
     # PROCESS THE DATA IN SOME WAY
     results = opteryx.query("SELECT COUNT(*) FROM mysql.planets;")
     assert results.rowcount == 1, results.rowcount
+    assert results.columncount == 1
+
+    # PUSH A PROJECTION
+    results = opteryx.query("SELECT name FROM mysql.planets;")
+    assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 1
 
 
 if __name__ == "__main__":  # pragma: no cover
diff --git a/tests/storage/test_sql_sqlite.py b/tests/storage/test_sql_sqlite.py
index 97671721c..c9c8ee05d 100644
--- a/tests/storage/test_sql_sqlite.py
+++ b/tests/storage/test_sql_sqlite.py
@@ -20,10 +20,17 @@ def test_sqlite_storage():
 
     results = opteryx.query("SELECT * FROM sqlite.planets")
     assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 20
 
     # PROCESS THE DATA IN SOME WAY
     results = opteryx.query("SELECT COUNT(*) FROM sqlite.planets;")
     assert results.rowcount == 1, results.rowcount
+    assert results.columncount == 1
+
+    # PUSH A PROJECTION
+    results = opteryx.query("SELECT name FROM sqlite.planets;")
+    assert results.rowcount == 9, results.rowcount
+    assert results.columncount == 1
 
 
 test_sqlite_storage()

From 83d1a6a9b8ba1b192b9e8dbbaa432aea45fceb6e Mon Sep 17 00:00:00 2001
From: joocer <justin.joyce@joocer.com>
Date: Wed, 15 Nov 2023 23:22:49 +0000
Subject: [PATCH 2/3] #1261

---
 opteryx/utils/file_decoders.py | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py
index 8bda41701..dbea4be6f 100644
--- a/opteryx/utils/file_decoders.py
+++ b/opteryx/utils/file_decoders.py
@@ -144,11 +144,12 @@ def orc_decoder(buffer, projection: List = None, selection=None, just_schema: bo
 
     # work out the selected columns, handling aliases
     selected_columns = []
-    for projection_column in projection:
-        for schema_column in schema_columns:
-            if schema_column in projection_column.all_names:
-                selected_columns.append(schema_column)
-                break
+    if projection:
+        for projection_column in projection:
+            for schema_column in schema_columns:
+                if schema_column in projection_column.all_names:
+                    selected_columns.append(schema_column)
+                    break
     if len(selected_columns) == 0:  # pragma: no-cover
         selected_columns = None
 

From 2470ed76e983cf3cfaefc7ab761e44423589cb32 Mon Sep 17 00:00:00 2001
From: joocer <justin.joyce@joocer.com>
Date: Thu, 16 Nov 2023 12:50:02 +0000
Subject: [PATCH 3/3] #1261

---
 opteryx/__version__.py                              |  2 +-
 tests/sql_battery/test_shapes_and_errors_battery.py | 11 +++++------
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/opteryx/__version__.py b/opteryx/__version__.py
index 618660153..8507a9129 100644
--- a/opteryx/__version__.py
+++ b/opteryx/__version__.py
@@ -17,4 +17,4 @@
 """
 
 # __version__ = "0.4.0-alpha.6"
-__version__ = "0.12.0-beta.1"
+__version__ = "0.12.0-beta.2"
diff --git a/tests/sql_battery/test_shapes_and_errors_battery.py b/tests/sql_battery/test_shapes_and_errors_battery.py
index 720806849..2460ecc9c 100644
--- a/tests/sql_battery/test_shapes_and_errors_battery.py
+++ b/tests/sql_battery/test_shapes_and_errors_battery.py
@@ -1288,17 +1288,16 @@ def test_sql_battery(statement, rows, columns, exception):
         "testdata.partitioned", DiskConnector, partition_scheme=MabelPartitionScheme
     )
 
-    conn = opteryx.connect()
-    cursor = conn.cursor()
     try:
-        cursor.execute(statement)
-        actual_rows, actual_columns = cursor.shape
+        # query to arrow is the fastest way to query
+        result = opteryx.query_to_arrow(statement)
+        actual_rows, actual_columns = result.shape
         assert (
             rows == actual_rows
-        ), f"\n{cursor.display()}\n\033[38;5;203mQuery returned {actual_rows} rows but {rows} were expected.\033[0m\n{statement}"
+        ), f"\n\033[38;5;203mQuery returned {actual_rows} rows but {rows} were expected.\033[0m\n{statement}"
         assert (
             columns == actual_columns
-        ), f"\n{cursor.display()}\n\033[38;5;203mQuery returned {actual_columns} cols but {columns} were expected.\033[0m\n{statement}"
+        ), f"\n\033[38;5;203mQuery returned {actual_columns} cols but {columns} were expected.\033[0m\n{statement}"
     except AssertionError as err:
         raise Exception(err) from err
     except Exception as err: