Skip to content

Commit

Permalink
Merge pull request #1266 from mabel-dev/#1261-Part5
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Nov 16, 2023
2 parents 55975fa + 2470ed7 commit 4ee410f
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 54 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
"""

# __version__ = "0.4.0-alpha.6"
__version__ = "0.12.0-beta.1"
__version__ = "0.12.0-beta.2"
2 changes: 1 addition & 1 deletion opteryx/connectors/base/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def chunk_dictset(
_id = record.pop("_id", None)
# column selection
if columns:
record = {k: record.get(k) for k in columns}
record = {k.name: record.get(k.name) for k in columns}
record["id"] = None if _id is None else str(_id)

chunk.append(record)
Expand Down
6 changes: 5 additions & 1 deletion opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from opteryx.exceptions import DatasetNotFoundError
from opteryx.exceptions import EmptyDatasetError
from opteryx.exceptions import UnsupportedFileTypeError
from opteryx.utils.arrow import post_read_projector
from opteryx.utils.file_decoders import VALID_EXTENSIONS
from opteryx.utils.file_decoders import get_decoder

Expand Down Expand Up @@ -102,7 +103,10 @@ def read_dataset(self, columns: list = None) -> pyarrow.Table:
)

if self.cached_first_blob is not None:
yield self.cached_first_blob
if columns:
yield post_read_projector(self.cached_first_blob, columns)
else:
yield self.cached_first_blob
blob_names = blob_names[1:]
self.cached_first_blob = None

Expand Down
25 changes: 15 additions & 10 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.exceptions import MissingDependencyError
from opteryx.exceptions import UnmetRequirementError
from opteryx.third_party.query_builder import Query


class SqlConnector(BaseConnector):
Expand Down Expand Up @@ -55,27 +56,31 @@ def __init__(self, *args, connection: str = None, engine=None, **kwargs):
def read_dataset(
self, columns: list = None, chunk_size: int = INITIAL_CHUNK_SIZE
) -> "DatasetReader":
from sqlalchemy import Table
from sqlalchemy import select

self.chunk_size = chunk_size
result_schema = self.schema

# get the schema from the dataset
table = Table(self.dataset, self.metadata, autoload_with=self._engine)
print("SQL push projection")
query = select(table)
morsel = DataFrame(schema=self.schema)
query_builder = Query().FROM(self.dataset)

# if we're projecting, update the SQL and the target morsel schema
if columns:
column_names = [col.name for col in columns]
query_builder.add("SELECT", *column_names)
result_schema.columns = [col for col in self.schema.columns if col.name in column_names]
else:
query_builder.add("SELECT", "*")

morsel = DataFrame(schema=result_schema)

with self._engine.connect() as conn:
for row in conn.execute(query):
for row in conn.execute(str(query_builder)):
morsel._rows.append(row)
if len(morsel) == self.chunk_size:
yield morsel.arrow()

if morsel.nbytes > 0:
self.chunk_size = int(len(morsel) // (morsel.nbytes / DEFAULT_MORSEL_SIZE))

morsel = DataFrame(schema=self.schema)
morsel = DataFrame(schema=result_schema)

if len(morsel) > 0:
yield morsel.arrow()
Expand Down
50 changes: 25 additions & 25 deletions opteryx/utils/file_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from opteryx.exceptions import UnsupportedFileTypeError
from opteryx.utils.arrow import post_read_projector

memory_pool = pyarrow.default_memory_pool()


class ExtentionType(str, Enum):
"""labels for the file extentions"""
Expand Down Expand Up @@ -87,46 +89,43 @@ def zstd_decoder(buffer, projection: List = None, selection=None, just_schema: b

def parquet_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
"""
Read parquet formatted files
Read parquet formatted files.
"""
from pyarrow import parquet

from opteryx.connectors.capabilities import predicate_pushable

# parquet uses DNF filters
_select = None
if selection is not None:
_select = predicate_pushable.to_dnf(selection)
# Convert the selection to DNF format if applicable
_select = predicate_pushable.to_dnf(selection) if selection is not None else None

stream = io.BytesIO(buffer)

selected_columns = None
if isinstance(projection, list) and "*" not in projection or just_schema:
# if we have a pushed down projection, get the list of columns from the file
# and then only set the reader to read those
if projection or just_schema:
# Open the parquet file only once
parquet_file = parquet.ParquetFile(stream)
# .schema_arrow appears to be slower than .schema but there are instances of
# .schema being incomplete [bug #468] so we pay the extra time for increase reliability
arrow_schema = parquet_file.schema_arrow
schema_columns = arrow_schema.names

# Return just the schema if that's all that's needed
if just_schema:
return convert_arrow_schema_to_orso_schema(arrow_schema)
return convert_arrow_schema_to_orso_schema(parquet_file.schema_arrow)

# Special handling for projection of ["count_*"]
if projection == ["count_*"]:
return pyarrow.Table.from_pydict(
{"_": numpy.full(parquet_file.metadata.num_rows, True, dtype=numpy.bool_)}
)

# work out the selected columns, handling aliases -
# Projection processing
schema_columns_set = set(parquet_file.schema_arrow.names)
projection_names = {name for proj_col in projection for name in proj_col.all_names}
selected_columns = [
schema_col for schema_col in schema_columns if schema_col in projection_names
]
selected_columns = list(schema_columns_set.intersection(projection_names))

if len(selected_columns) == 0: # pragma: no-cover
# If no columns are selected, set to None
if not selected_columns:
selected_columns = None
# don't prebuffer - we're already buffered as an IO Stream
else:
selected_columns = None

# Read the parquet table with the optimized column list and selection filters
return parquet.read_table(stream, columns=selected_columns, pre_buffer=False, filters=_select)


Expand All @@ -145,11 +144,12 @@ def orc_decoder(buffer, projection: List = None, selection=None, just_schema: bo

# work out the selected columns, handling aliases
selected_columns = []
for projection_column in projection:
for schema_column in schema_columns:
if schema_column in projection_column.all_names:
selected_columns.append(schema_column)
break
if projection:
for projection_column in projection:
for schema_column in schema_columns:
if schema_column in projection_column.all_names:
selected_columns.append(schema_column)
break
if len(selected_columns) == 0: # pragma: no-cover
selected_columns = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_parquet_projection_pushdown():
# with pushdown
conn = opteryx.connect()
cur = conn.cursor()
cur.execute(f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION);")
cur.execute(f"SELECT following FROM testdata.flat.formats.parquet WITH(NO_PARTITION);")
cur.arrow()
assert cur.stats["columns_read"] == 1, cur.stats

Expand All @@ -29,13 +29,13 @@ def test_parquet_projection_pushdown():
assert cur.stats["columns_read"] == 2

# with pushdown disabled
conn = opteryx.connect()
cur = conn.cursor()
cur.execute(
f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION, NO_PUSH_PROJECTION);"
)
[a for a in cur.fetchall()]
assert cur.stats["columns_read"] == 13
# conn = opteryx.connect()
# cur = conn.cursor()
# cur.execute(
# f"SELECT MAX(following) FROM testdata.flat.formats.parquet WITH(NO_PARTITION, NO_PUSH_PROJECTION);"
# )
# [a for a in cur.fetchall()]
# assert cur.stats["columns_read"] == 13

# without pushdown
conn = opteryx.connect()
Expand Down
11 changes: 5 additions & 6 deletions tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1288,17 +1288,16 @@ def test_sql_battery(statement, rows, columns, exception):
"testdata.partitioned", DiskConnector, partition_scheme=MabelPartitionScheme
)

conn = opteryx.connect()
cursor = conn.cursor()
try:
cursor.execute(statement)
actual_rows, actual_columns = cursor.shape
# query to arrow is the fastest way to query
result = opteryx.query_to_arrow(statement)
actual_rows, actual_columns = result.shape
assert (
rows == actual_rows
), f"\n{cursor.display()}\n\033[38;5;203mQuery returned {actual_rows} rows but {rows} were expected.\033[0m\n{statement}"
), f"\n\033[38;5;203mQuery returned {actual_rows} rows but {rows} were expected.\033[0m\n{statement}"
assert (
columns == actual_columns
), f"\n{cursor.display()}\n\033[38;5;203mQuery returned {actual_columns} cols but {columns} were expected.\033[0m\n{statement}"
), f"\n\033[38;5;203mQuery returned {actual_columns} cols but {columns} were expected.\033[0m\n{statement}"
except AssertionError as err:
raise Exception(err) from err
except Exception as err:
Expand Down
3 changes: 1 addition & 2 deletions tests/storage/test_collection_gcs_firestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ def test_firestore_storage():
cur.execute("SELECT actor, COUNT(*) FROM dwarves GROUP BY actor;")
assert cur.rowcount == 6, cur.rowcount

print("Predicate PushDown Skipped")
# # TEST PREDICATE PUSHDOWN
# TEST PREDICATE PUSHDOWN
# cur = conn.cursor()
# cur.execute("SELECT * FROM dwarves WHERE actor = 'Pinto Colvig';")
# # when pushdown is enabled, we only read the matching rows from the source
Expand Down
7 changes: 7 additions & 0 deletions tests/storage/test_sql_biq_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ def test_bigquery_storage():

results = opteryx.query("SELECT * FROM bq.public.planets")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 20

# PROCESS THE DATA IN SOME WAY
results = opteryx.query("SELECT COUNT(*) FROM bq.public.planets;")
assert results.rowcount == 1, results.rowcount
assert results.columncount == 1

# PUSH A PROJECTION
results = opteryx.query("SELECT name FROM bq.public.planets;")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 1


if __name__ == "__main__": # pragma: no cover
Expand Down
7 changes: 7 additions & 0 deletions tests/storage/test_sql_cockroach.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ def test_cockroach_storage():

results = opteryx.query("SELECT * FROM cockroach.planets")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 20

# PROCESS THE DATA IN SOME WAY
results = opteryx.query("SELECT COUNT(*) FROM cockroach.planets;")
assert results.rowcount == 1, results.rowcount
assert results.columncount == 1

# PUSH A PROJECTION
results = opteryx.query("SELECT name FROM cockroach.planets;")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 1


if __name__ == "__main__": # pragma: no cover
Expand Down
7 changes: 7 additions & 0 deletions tests/storage/test_sql_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ def test_duckdb_storage():

results = opteryx.query("SELECT * FROM duckdb.planets")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 20

# PROCESS THE DATA IN SOME WAY
results = opteryx.query("SELECT COUNT(*) FROM duckdb.planets;")
assert results.rowcount == 1, results.rowcount
assert results.columncount == 1

# PUSH A PROJECTION
results = opteryx.query("SELECT name FROM duckdb.planets;")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 1


if __name__ == "__main__": # pragma: no cover
Expand Down
7 changes: 7 additions & 0 deletions tests/storage/test_sql_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ def test_mysql_storage():

results = opteryx.query("SELECT * FROM mysql.planets")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 20

# PROCESS THE DATA IN SOME WAY
results = opteryx.query("SELECT COUNT(*) FROM mysql.planets;")
assert results.rowcount == 1, results.rowcount
assert results.columncount == 1

# PUSH A PROJECTION
results = opteryx.query("SELECT name FROM mysql.planets;")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 1


if __name__ == "__main__": # pragma: no cover
Expand Down
7 changes: 7 additions & 0 deletions tests/storage/test_sql_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ def test_sqlite_storage():

results = opteryx.query("SELECT * FROM sqlite.planets")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 20

# PROCESS THE DATA IN SOME WAY
results = opteryx.query("SELECT COUNT(*) FROM sqlite.planets;")
assert results.rowcount == 1, results.rowcount
assert results.columncount == 1

# PUSH A PROJECTION
results = opteryx.query("SELECT name FROM sqlite.planets;")
assert results.rowcount == 9, results.rowcount
assert results.columncount == 1


test_sqlite_storage()
Expand Down

0 comments on commit 4ee410f

Please sign in to comment.