Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jan 9, 2025
1 parent b6b2707 commit b97a85a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
6 changes: 3 additions & 3 deletions opteryx/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def main():
)
if args.stats:
print(
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration/1e9} seconds )"
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration / 1e9} seconds )"
)
except MissingSqlStatement:
print(
Expand Down Expand Up @@ -171,7 +171,7 @@ def main():
)
if args.stats:
print(
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration/1e9} seconds )"
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration / 1e9} seconds )"
)
else:
table = result.arrow()
Expand All @@ -197,7 +197,7 @@ def main():
else:
raise ValueError(f"Unknown output format '{ext}'")
print(
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration/1e9} seconds )"
f"[ {result.rowcount} rows x {result.columncount} columns ] ( {duration / 1e9} seconds )"
)
print(f"Written result to '{args.output}'")

Expand Down
14 changes: 12 additions & 2 deletions opteryx/connectors/iceberg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@

from opteryx.connectors import DiskConnector
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.connectors.capabilities import LimitPushable


class IcebergConnector(BaseConnector):
class IcebergConnector(BaseConnector, LimitPushable):
__mode__ = "Blob"
__type__ = "ARROW"
__type__ = "ICEBERG"

def __init__(self, *args, catalog=None, io=DiskConnector, **kwargs):
BaseConnector.__init__(self, **kwargs)
LimitPushable.__init__(self, **kwargs)

self.dataset = self.dataset.lower()
self.table = catalog.load_table(self.dataset)
Expand All @@ -39,6 +41,9 @@ def get_dataset_schema(self) -> RelationSchema:
return self.schema

def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
rows_read = 0
limit = kwargs.get("limit")

if columns is None:
column_names = self.schema.column_names
else:
Expand All @@ -49,4 +54,9 @@ def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
).to_arrow_batch_reader()

for batch in reader:
if limit and rows_read + batch.num_rows > limit:
batch = batch.slice(0, limit - rows_read)
yield pyarrow.Table.from_batches([batch])
rows_read += batch.num_rows
if limit and rows_read >= limit:
break
1 change: 1 addition & 0 deletions opteryx/managers/execution/serial_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def _inner_explain(node, depth):
record["time_ms"] = operator.execution_time / 1e6
record["records_in"] = operator.records_in
record["records_out"] = operator.records_out
record["calls"] = operator.calls
yield record
yield from _inner_explain(operator_name[0], depth + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ def test_predicate_compaction():

# After compaction, the upper limit should be '< 7'
expected_upper_limit = Limit(7, False) # Assuming exclusive bounds for '<'
assert (
vr.upper == expected_upper_limit
), f"Expected upper limit to be {expected_upper_limit}, got {vr.upper}"
assert vr.upper == expected_upper_limit, (
f"Expected upper limit to be {expected_upper_limit}, got {vr.upper}"
)


test_initialization()
Expand Down
5 changes: 4 additions & 1 deletion opteryx/planner/optimizer/strategies/limit_pushdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ def visit(self, node: LogicalPlanNode, context: OptimizerContext) -> OptimizerCo
context.collected_limits.append(node)
return context

if node.node_type.Scan and LimitPushable in node.connector.__class__.mro():
if (
node.node_type == LogicalPlanStepType.Scan
and LimitPushable in node.connector.__class__.mro()
):
for limit_node in context.collected_limits:
if node.relation in limit_node.all_relations:
self.statistics.optimization_limit_pushdown += 1
Expand Down
2 changes: 1 addition & 1 deletion tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@
("SELECT * FROM $planets AS P LEFT SEMI JOIN (SELECT id FROM $satellites WHERE name != 'Moon') AS S ON S.id = P.id;", 8, 20, None),
("SELECT * FROM $planets AS P LEFT SEMI JOIN $satellites AS S ON S.id = P.id WHERE P.name != 'Earth';", 8, 20, None),
("SELECT * FROM GENERATE_SERIES(1, 10) AS G LEFT SEMI JOIN $satellites AS S ON S.id = G;", 10, 1, None),
("EXPLAIN ANALYZE FORMAT JSON SELECT * FROM $planets AS a INNER JOIN (SELECT id FROM $planets) AS b USING (id);", 3, 6, None),
("EXPLAIN ANALYZE FORMAT JSON SELECT * FROM $planets AS a INNER JOIN (SELECT id FROM $planets) AS b USING (id);", 3, 7, None),
("SELECT DISTINCT ON (planetId) planetId, name FROM $satellites ", 7, 2, None),
("SELECT 8 DIV 4", 1, 1, None),

Expand Down

0 comments on commit b97a85a

Please sign in to comment.