Skip to content

Commit

Permalink
Merge pull request #2168 from mabel-dev/HOUSEKEEPING/15
Browse files Browse the repository at this point in the history
Housekeeping/15
  • Loading branch information
joocer authored Dec 29, 2024
2 parents 84be181 + 6466b85 commit a23af30
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 265 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 930
__build__ = 931

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
16 changes: 8 additions & 8 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ def parse_yaml(yaml_str: str) -> dict:
def line_value(value: str) -> typing.Any:
value = value.strip()
if value.isdigit():
value = int(value)
elif value.replace(".", "", 1).isdigit():
value = float(value)
elif value.lower() == "true":
return int(value)
if value.replace(".", "", 1).isdigit():
return float(value)
if value.lower() == "true":
return True
elif value.lower() == "false":
if value.lower() == "false":
return False
elif value.lower() == "none":
if value.lower() == "none":
return None
elif value.startswith("["):
if value.startswith("["):
return [val.strip() for val in value[1:-1].split(",")]
elif value.startswith("-"):
if value.startswith("-"):
return [val.strip() for val in value.split("-") if val.strip()]
return value

Expand Down
34 changes: 17 additions & 17 deletions opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,29 @@ class NodeType(int, Enum):
# fmt:off

# 00000000
UNKNOWN: int = 0
UNKNOWN = 0

# LOGICAL OPERATORS
# 0001 nnnn
AND: int = 17 # 0001 0001
OR: int = 18 # 0001 0010
XOR: int = 19 # 0001 0011
NOT: int = 20 # 0001 0100
AND = 17 # 0001 0001
OR = 18 # 0001 0010
XOR = 19 # 0001 0011
NOT = 20 # 0001 0100

# INTERAL IDENTIFIERS
# 0010 nnnn
WILDCARD: int = 33 # 0010 0001
COMPARISON_OPERATOR: int = 34 # 0010 0010
BINARY_OPERATOR: int = 35 # 0010 0011
UNARY_OPERATOR: int = 36 # 0010 0100
FUNCTION: int = 37 # 0010 0101
IDENTIFIER: int = 38 # 0010 0110
SUBQUERY: int = 39 # 0010 0111
NESTED: int = 40 # 0010 1000
AGGREGATOR:int = 41 # 0010 1001
LITERAL:int = 42 # 0010 1010
EXPRESSION_LIST: int = 43 # 0010 1011 (CASE WHEN)
EVALUATED: int = 44 # 0010 1100 - memoize results
WILDCARD = 33 # 0010 0001
COMPARISON_OPERATOR = 34 # 0010 0010
BINARY_OPERATOR = 35 # 0010 0011
UNARY_OPERATOR = 36 # 0010 0100
FUNCTION = 37 # 0010 0101
IDENTIFIER = 38 # 0010 0110
SUBQUERY = 39 # 0010 0111
NESTED = 40 # 0010 1000
AGGREGATOR = 41 # 0010 1001
LITERAL = 42 # 0010 1010
EXPRESSION_LIST = 43 # 0010 1011 (CASE WHEN)
EVALUATED = 44 # 0010 1100 - memoize results


ORSO_TO_NUMPY_MAP = {
Expand Down
2 changes: 0 additions & 2 deletions opteryx/planner/physical_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ def create_physical_plan(logical_plan, query_properties) -> PhysicalPlan:
raise UnsupportedSyntaxError(f"Unsupported SHOW type '{node_config['object_type']}'")
elif node_type == LogicalPlanStepType.ShowColumns:
node = operators.ShowColumnsNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Subquery:
node = operators.NoOpNode(query_properties, **node_config)
elif node_type == LogicalPlanStepType.Union:
node = operators.UnionNode(query_properties, **node_config)
else: # pragma: no cover
Expand Down
26 changes: 9 additions & 17 deletions opteryx/utils/file_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from opteryx.managers.expression import NodeType
from opteryx.managers.expression import get_all_nodes_of_type
from opteryx.utils.arrow import post_read_projector
from opteryx.utils.memory_view_stream import MemoryViewStream


class ExtentionType(str, Enum):
Expand Down Expand Up @@ -163,8 +162,7 @@ def zstd_decoder(
"""
import zstandard

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)

with zstandard.open(stream, "rb") as file:
return jsonl_decoder(
Expand All @@ -185,8 +183,7 @@ def lzma_decoder(
"""
import lzma

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)

with lzma.open(stream, "rb") as file:
return jsonl_decoder(
Expand Down Expand Up @@ -285,8 +282,7 @@ def orc_decoder(
"""
import pyarrow.orc as orc

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)
orc_file = orc.ORCFile(stream)

if just_schema:
Expand All @@ -303,7 +299,7 @@ def orc_decoder(


def jsonl_decoder(
buffer: Union[memoryview, bytes],
buffer: Union[memoryview, bytes, BinaryIO],
*,
projection: Optional[list] = None,
selection: Optional[list] = None,
Expand All @@ -317,7 +313,7 @@ def jsonl_decoder(
rows = []

if not isinstance(buffer, bytes):
buffer = buffer.read()
buffer = buffer.read() # type: ignore

for line in buffer.split(b"\n"):
if not line:
Expand Down Expand Up @@ -354,8 +350,7 @@ def csv_decoder(
import pyarrow.csv
from pyarrow.csv import ParseOptions

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)
parse_options = ParseOptions(delimiter=delimiter, newlines_in_values=True)
table = pyarrow.csv.read_csv(stream, parse_options=parse_options)
schema = table.schema
Expand Down Expand Up @@ -415,8 +410,7 @@ def arrow_decoder(
) -> Tuple[int, int, pyarrow.Table]:
import pyarrow.feather as pf

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)
table = pf.read_table(stream)
schema = table.schema
if just_schema:
Expand Down Expand Up @@ -454,8 +448,7 @@ def avro_decoder(

raise MissingDependencyError("fastavro")

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)
reader = fastavro.reader(stream)

if just_schema:
Expand Down Expand Up @@ -496,8 +489,7 @@ def ipc_decoder(

from pyarrow import ipc

stream: BinaryIO = None
stream = MemoryViewStream(buffer) if isinstance(buffer, memoryview) else io.BytesIO(buffer)
stream: BinaryIO = io.BytesIO(buffer)
reader = ipc.open_stream(stream)

batch_one = next(reader, None)
Expand Down
101 changes: 0 additions & 101 deletions opteryx/utils/memory_view_stream.py

This file was deleted.

Loading

0 comments on commit a23af30

Please sign in to comment.