From 0878b9069dfee25c40c461b40d9bf5d5d65f6edb Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 29 Dec 2024 18:55:38 +0000 Subject: [PATCH 1/2] #2165 --- opteryx/operators/async_read_node.py | 2 +- opteryx/operators/base_plan_node.py | 29 ++++++++++++++++------------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/opteryx/operators/async_read_node.py b/opteryx/operators/async_read_node.py index 4db5ea946..fdaf3fe0a 100644 --- a/opteryx/operators/async_read_node.py +++ b/opteryx/operators/async_read_node.py @@ -78,7 +78,7 @@ def execute(self, morsel, **kwargs) -> Generator: from opteryx import system_statistics - """Perform this step, time how long is spent doing work""" + # Perform this step, time how long is spent doing work orso_schema = self.parameters["schema"] reader = self.parameters["connector"] diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index a42c66e75..809594cb6 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -10,7 +10,9 @@ import pyarrow from orso.tools import random_string -from opteryx.config import MORSEL_SIZE +from opteryx import EOS + +END = object() class BasePlanNode: @@ -77,31 +79,34 @@ def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Tab try: # Time the production of the next result start_time = time.monotonic_ns() - result = next(generator) # Retrieve the next item from the generator + result = next(generator, END) # Retrieve the next item from the generator execution_time = time.monotonic_ns() - start_time self.execution_time += execution_time self.statistics.increase("time_" + self.name.lower(), execution_time) # Update metrics for valid results + if result == END: + # Break the loop when the generator is exhausted + if not at_least_one: + yield empty_morsel + break + if hasattr(result, "num_rows"): self.records_out += result.num_rows self.bytes_out += result.nbytes + if empty_morsel is None: + empty_morsel = result.slice(0, 0) + # if we get empty sets, don't yield them unless they're the only one if result.num_rows > 0: self.statistics.avoided_empty_datasets += 1 at_least_one = True yield result - else: - empty_morsel = result - else: - yield result - - except StopIteration: - # Break the loop when the generator is exhausted - if not at_least_one and empty_morsel is not None: - yield empty_morsel - break + continue + + yield result + except Exception as err: # print(f"Exception {err} in operator", self.name) raise err From 4794004bc0b28e864ea9535b9a44315a91eff248 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sun, 29 Dec 2024 18:56:03 +0000 Subject: [PATCH 2/2] Opteryx Version 0.19.0-alpha.930 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 1cb380ea2..9a67dbbdd 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 928 +__build__ = 930 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.