Skip to content

Commit

Permalink
Merge pull request #2166 from mabel-dev/#2165
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Dec 29, 2024
2 parents 570f50c + 4794004 commit 84be181
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 928
__build__ = 930

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def execute(self, morsel, **kwargs) -> Generator:

from opteryx import system_statistics

"""Perform this step, time how long is spent doing work"""
# Perform this step, time how long is spent doing work
orso_schema = self.parameters["schema"]
reader = self.parameters["connector"]

Expand Down
29 changes: 17 additions & 12 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import pyarrow
from orso.tools import random_string

from opteryx.config import MORSEL_SIZE
from opteryx import EOS

END = object()


class BasePlanNode:
Expand Down Expand Up @@ -77,31 +79,34 @@ def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Tab
try:
# Time the production of the next result
start_time = time.monotonic_ns()
result = next(generator) # Retrieve the next item from the generator
result = next(generator, END) # Retrieve the next item from the generator
execution_time = time.monotonic_ns() - start_time
self.execution_time += execution_time
self.statistics.increase("time_" + self.name.lower(), execution_time)

# Update metrics for valid results
if result == END:
# Break the loop when the generator is exhausted
if not at_least_one:
yield empty_morsel
break

if hasattr(result, "num_rows"):
self.records_out += result.num_rows
self.bytes_out += result.nbytes

if empty_morsel is None:
empty_morsel = result.slice(0, 0)

# if we get empty sets, don't yield them unless they're the only one
if result.num_rows > 0:
self.statistics.avoided_empty_datasets += 1
at_least_one = True
yield result
else:
empty_morsel = result
else:
yield result

except StopIteration:
# Break the loop when the generator is exhausted
if not at_least_one and empty_morsel is not None:
yield empty_morsel
break
continue

yield result

except Exception as err:
# print(f"Exception {err} in operator", self.name)
raise err
Expand Down

0 comments on commit 84be181

Please sign in to comment.