Skip to content

Commit

Permalink
Merge pull request #2155 from mabel-dev/#2151
Browse files Browse the repository at this point in the history
joocer authored Dec 24, 2024
2 parents e73533a + 9a35da1 commit 065b389
Showing 9 changed files with 5 additions and 91 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 920
__build__ = 921

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
1 change: 0 additions & 1 deletion opteryx/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND.


from .base_plan_node import BasePlanDataObject # isort: skip
from .base_plan_node import BasePlanNode, JoinNode # isort: skip

from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate
16 changes: 0 additions & 16 deletions opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
@@ -14,8 +14,6 @@
"""

from dataclasses import dataclass

import numpy
import pyarrow
from orso.types import OrsoTypes
@@ -28,22 +26,10 @@
from opteryx.operators.aggregate_node import build_aggregations
from opteryx.operators.aggregate_node import extract_evaluations
from opteryx.operators.aggregate_node import project
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class AggregateAndGroupDataObject(BasePlanDataObject):
groups: list = None
aggregates: list = None
all_identifiers: list = None
evaluatable_nodes: list = None
group_by_columns: list = None
column_map: list = None
aggregate_functions: list = None


class AggregateAndGroupNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
@@ -79,8 +65,6 @@ def __init__(self, properties: QueryProperties, **parameters):
self.group_by_columns = list({node.schema_column.identity for node in self.groups})
self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)

self.do = AggregateAndGroupDataObject()

self.buffer = []

@classmethod
17 changes: 4 additions & 13 deletions opteryx/operators/aggregate_node.py
Original file line number Diff line number Diff line change
@@ -11,8 +11,6 @@
This node performs aggregates without performing groupings.
"""

from dataclasses import dataclass

import numpy
import pyarrow

@@ -22,7 +20,6 @@
from opteryx.managers.expression import evaluate_and_append
from opteryx.managers.expression import get_all_nodes_of_type
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode

@@ -166,16 +163,11 @@ def extract_evaluations(aggregates):
if len(aggregators) == 0:
evaluatable_nodes.append(node)

return evaluatable_nodes

literal_count = len([n for n in evaluatable_nodes if n.node_type == NodeType.LITERAL])
if literal_count > 0 and literal_count < len(evaluatable_nodes):
evaluatable_nodes = [n for n in evaluatable_nodes if n.node_type != NodeType.LITERAL]

@dataclass
class AggregateDataObject(BasePlanDataObject):
aggregates: list = None
all_identifiers: list = None
evaluatable_nodes: list = None
column_map: list = None
aggregate_functions: list = None
return evaluatable_nodes


class AggregateNode(BasePlanNode):
@@ -196,7 +188,6 @@ def __init__(self, properties: QueryProperties, **parameters):

self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)

self.do = AggregateDataObject()
self.buffer = []

@classmethod
8 changes: 0 additions & 8 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
import queue
import threading
import time
from dataclasses import dataclass
from typing import Generator

import aiohttp
@@ -28,7 +27,6 @@
from opteryx import config
from opteryx.exceptions import DataError
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder
@@ -61,17 +59,11 @@ async def fetch_and_process(blob_name):
await session.close()


@dataclass
class AsyncReaderDataObject(BasePlanDataObject):
pass


class AsyncReaderNode(ReaderNode):
def __init__(self, properties: QueryProperties, **parameters):
ReaderNode.__init__(self, properties=properties, **parameters)
self.pool = MemoryPool(MAX_READ_BUFFER_CAPACITY, f"ReadBuffer <{self.parameters['alias']}>")

self.do = AsyncReaderDataObject()
self.predicates = parameters.get("predicates")

@classmethod
18 changes: 0 additions & 18 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
@@ -5,28 +5,11 @@


import time
from dataclasses import dataclass
from typing import Optional

import pyarrow
from orso.tools import random_string

from opteryx import EOS


@dataclass
class BasePlanDataObject:
operation: Optional[str] = None
query_id: str = None
identity: str = None

def __post_init__(self):
# Perform actions after initialization
if self.identity is None:
self.identity = random_string()
if self.operation is None:
self.operation = self.__class__.__name__.replace("DataObject", "Node")


class BasePlanNode:
is_join: bool = False
@@ -47,7 +30,6 @@ def __init__(self, *, properties, **parameters):
self.parameters = parameters
self.execution_time = 0
self.identity = random_string()
self.do: Optional[BasePlanDataObject] = None
self.calls = 0
self.records_in = 0
self.bytes_in = 0
11 changes: 0 additions & 11 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@
here rather than calling the join() functions
"""

from dataclasses import dataclass
from typing import Generator
from typing import Set
from typing import Tuple
@@ -26,7 +25,6 @@
from opteryx.managers.expression import NodeType
from opteryx.models import LogicalColumn
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import JoinNode

@@ -250,15 +248,6 @@ def _chunker(seq_1, seq_2, size):
)


@dataclass
class CrossJoinDataObject(BasePlanDataObject):
source: str = None
_unnest_column: str = None
_unnest_target: str = None
_filters: str = None
_distinct: bool = False


class CrossJoinNode(JoinNode):
"""
Implements a SQL CROSS JOIN
13 changes: 0 additions & 13 deletions opteryx/operators/exit_node.py
Original file line number Diff line number Diff line change
@@ -17,34 +17,21 @@
This node doesn't do any calculations, it is a pure Projection.
"""

from dataclasses import dataclass
from dataclasses import field
from typing import List

from pyarrow import Table

from opteryx import EOS
from opteryx.exceptions import AmbiguousIdentifierError
from opteryx.exceptions import InvalidInternalStateError
from opteryx.models import LogicalColumn
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class ExitDataObject(BasePlanDataObject):
columns: List[LogicalColumn] = field(default_factory=list)


class ExitNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.columns = parameters.get("columns", [])

self.do = ExitDataObject(columns=self.columns)

@classmethod
def from_json(cls, json_obj: str) -> "BasePlanNode": # pragma: no cover
raise NotImplementedError()
10 changes: 0 additions & 10 deletions opteryx/operators/heap_sort_node.py
Original file line number Diff line number Diff line change
@@ -17,8 +17,6 @@
sorting smaller chunks over and over again.
"""

from dataclasses import dataclass

import numpy
import pyarrow
import pyarrow.compute
@@ -27,24 +25,16 @@
from opteryx import EOS
from opteryx.exceptions import ColumnNotFoundError
from opteryx.models import QueryProperties
from opteryx.operators.base_plan_node import BasePlanDataObject

from . import BasePlanNode


@dataclass
class HeapSortDataObject(BasePlanDataObject):
order_by: list = None
limit: int = -1


class HeapSortNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
BasePlanNode.__init__(self, properties=properties, **parameters)
self.order_by = parameters.get("order_by", [])
self.limit: int = parameters.get("limit", -1)

self.do = HeapSortDataObject(order_by=self.order_by, limit=self.limit)
self.mapped_order = []
self.table = None

0 comments on commit 065b389

Please sign in to comment.