From fc67b647dbdd2863b2436d14a09322061c994268 Mon Sep 17 00:00:00 2001 From: joocer Date: Thu, 2 May 2024 22:40:06 +0100 Subject: [PATCH 1/7] HOUSEKEEPING --- .../connectors/gcp_cloudstorage_connector.py | 2 +- opteryx/managers/schemes/mabel_partitions.py | 6 ++--- opteryx/operators/__init__.py | 6 ++--- ...ync_scanner_node.py => async_read_node.py} | 4 ++-- opteryx/operators/base_plan_node.py | 8 ++++++- .../{selection_node.py => filter_node.py} | 2 +- .../{scanner_node.py => read_node.py} | 24 +++++++++++++++---- opteryx/planner/temporary_physical_planner.py | 6 ++--- 8 files changed, 40 insertions(+), 18 deletions(-) rename opteryx/operators/{async_scanner_node.py => async_read_node.py} (98%) rename opteryx/operators/{selection_node.py => filter_node.py} (98%) rename opteryx/operators/{scanner_node.py => read_node.py} (89%) diff --git a/opteryx/connectors/gcp_cloudstorage_connector.py b/opteryx/connectors/gcp_cloudstorage_connector.py index 0d534c89b..e8337cdc0 100644 --- a/opteryx/connectors/gcp_cloudstorage_connector.py +++ b/opteryx/connectors/gcp_cloudstorage_connector.py @@ -218,7 +218,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]: page_token = blob_data.get("nextPageToken") if not page_token: break - params["pageToken"] = page_token + params = {"pageToken": page_token} self.blob_list[prefix] = blob_names return blob_names diff --git a/opteryx/managers/schemes/mabel_partitions.py b/opteryx/managers/schemes/mabel_partitions.py index 04a58592e..cf367cf3e 100644 --- a/opteryx/managers/schemes/mabel_partitions.py +++ b/opteryx/managers/schemes/mabel_partitions.py @@ -133,12 +133,12 @@ def _inner(*, timestamp): if as_at is None: continue if is_complete_and_not_invalid(control_blobs, as_at): - blob_names = (blob for blob in blob_names if as_at in blob) + data_blobs = (blob for blob in data_blobs if as_at in blob) break - blob_names = [blob for blob in blob_names if as_at not in blob] + data_blobs = [blob for blob in data_blobs if as_at not in blob] as_at = None - return blob_names + return data_blobs start_date = start_date or midnight end_date = end_date or midnight.replace(hour=23, minute=59) diff --git a/opteryx/operators/__init__.py b/opteryx/operators/__init__.py index f87e08128..31612bfe5 100644 --- a/opteryx/operators/__init__.py +++ b/opteryx/operators/__init__.py @@ -15,7 +15,7 @@ from .aggregate_and_group_node import AggregateAndGroupNode # Group is always followed by aggregate from .aggregate_node import AGGREGATORS from .aggregate_node import AggregateNode # aggregate data -from .async_scanner_node import AsyncScannerNode +from .async_read_node import AsyncReaderNode # from .build_statistics_node import BuildStatisticsNode # Analyze Tables from .cross_join_node import CrossJoinNode # CROSS JOIN @@ -34,8 +34,8 @@ from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels from .noop_node import NoOpNode # No Operation from .projection_node import ProjectionNode # remove unwanted columns including renames -from .scanner_node import ScannerNode -from .selection_node import SelectionNode # filter unwanted rows +from .read_node import ReaderNode +from .filter_node import FilterNode # filter unwanted rows from .set_variable_node import SetVariableNode from .show_columns_node import ShowColumnsNode # column details diff --git a/opteryx/operators/async_scanner_node.py b/opteryx/operators/async_read_node.py similarity index 98% rename from opteryx/operators/async_scanner_node.py rename to opteryx/operators/async_read_node.py index 75b2b641c..0165cb75f 100644 --- a/opteryx/operators/async_scanner_node.py +++ b/opteryx/operators/async_read_node.py @@ -29,7 +29,7 @@ import pyarrow.parquet from orso.schema import RelationSchema -from opteryx.operators.scanner_node import ScannerNode +from opteryx.operators.read_node import ReaderNode from opteryx.shared import AsyncMemoryPool from opteryx.shared import MemoryPool from opteryx.utils.file_decoders import get_decoder @@ -95,7 +95,7 @@ async def fetch_and_process(blob_name): await session.close() -class AsyncScannerNode(ScannerNode): +class AsyncReaderNode(ReaderNode): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index f826bc32f..220c2f94a 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -19,10 +19,16 @@ from opteryx.models import QueryProperties from opteryx.models import QueryStatistics - class BasePlanNode: _producers = None + def to_dict(self) -> dict: + raise NotImplementedError() + + @classmethod + def from_dict(cls, dic: dict) -> "BasePlanNode": + raise NotImplementedError() + def __init__(self, properties: QueryProperties, **parameters): """ This is the base class for nodes in the execution plan. diff --git a/opteryx/operators/selection_node.py b/opteryx/operators/filter_node.py similarity index 98% rename from opteryx/operators/selection_node.py rename to opteryx/operators/filter_node.py index ac949b3ff..1a5007b15 100644 --- a/opteryx/operators/selection_node.py +++ b/opteryx/operators/filter_node.py @@ -33,7 +33,7 @@ from opteryx.operators import BasePlanNode -class SelectionNode(BasePlanNode): +class FilterNode(BasePlanNode): def __init__(self, properties: QueryProperties, **config): super().__init__(properties=properties) self.filter = config.get("filter") diff --git a/opteryx/operators/scanner_node.py b/opteryx/operators/read_node.py similarity index 89% rename from opteryx/operators/scanner_node.py rename to opteryx/operators/read_node.py index 704b4a500..8469100ff 100644 --- a/opteryx/operators/scanner_node.py +++ b/opteryx/operators/read_node.py @@ -65,7 +65,7 @@ def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.T return morsel.select([col.identity for col in schema.columns]) -class ScannerNode(BasePlanNode): +class ReaderNode(BasePlanNode): def __init__(self, properties: QueryProperties, **parameters): super().__init__(properties=properties, **parameters) self.start_date = parameters.get("start_date") @@ -74,13 +74,29 @@ def __init__(self, properties: QueryProperties, **parameters): self.columns = parameters.get("columns", []) self.predicates = parameters.get("predicates", []) + self.connector = parameters.get("connector") + self.schema = parameters.get("schema") + if len(self.hints) != 0: self.statistics.add_message("All HINTS are currently ignored") + def to_dict(self) -> dict: + return { + "identity": f"read-{self.identity}", + "opterator": "ReadNode", + "schema": self.columns, + "projection": self.columns, + "filters": self.predicates, + } + + @classmethod + def from_dict(cls, dic: dict) -> "BasePlanNode": + raise NotImplementedError() + @property def name(self): # pragma: no cover """friendly name for this step""" - return "Scan" + return "Read" @property # pragma: no cover def config(self): @@ -103,7 +119,7 @@ def config(self): def execute(self) -> Generator: """Perform this step, time how long is spent doing work""" morsel = None - orso_schema = self.parameters["schema"] + orso_schema = self.schema orso_schema_cols = [] for col in orso_schema.columns: if col.identity in [c.identity for c in self.columns]: @@ -111,7 +127,7 @@ def execute(self) -> Generator: orso_schema.columns = orso_schema_cols arrow_schema = None start_clock = time.monotonic_ns() - reader = self.parameters["connector"].read_dataset( + reader = self.connector.read_dataset( columns=self.columns, predicates=self.predicates ) for morsel in reader: diff --git a/opteryx/planner/temporary_physical_planner.py b/opteryx/planner/temporary_physical_planner.py index f0523674c..149fd7ae3 100644 --- a/opteryx/planner/temporary_physical_planner.py +++ b/opteryx/planner/temporary_physical_planner.py @@ -46,7 +46,7 @@ def create_physical_plan(logical_plan, query_properties): elif node_type == LogicalPlanStepType.Explain: node = operators.ExplainNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Filter: - node = operators.SelectionNode(query_properties, filter=node_config["condition"]) + node = operators.FilterNode(query_properties, filter=node_config["condition"]) elif node_type == LogicalPlanStepType.FunctionDataset: node = operators.FunctionDatasetNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Join: @@ -71,9 +71,9 @@ def create_physical_plan(logical_plan, query_properties): elif node_type == LogicalPlanStepType.Scan: connector = node_config.get("connector") if connector and hasattr(connector, "async_read_blob"): - node = operators.AsyncScannerNode(query_properties, **node_config) + node = operators.AsyncReaderNode(query_properties, **node_config) else: - node = operators.ScannerNode(query_properties, **node_config) + node = operators.ReaderNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Set: node = operators.SetVariableNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Show: From c4b191b6afdb12e7c7be67f29af192b68ff2fd1a Mon Sep 17 00:00:00 2001 From: joocer Date: Sat, 4 May 2024 23:20:22 +0100 Subject: [PATCH 2/7] HOUSEKEEPING --- opteryx/connectors/capabilities/cacheable.py | 10 +++---- opteryx/cursor.py | 4 +-- opteryx/functions/__init__.py | 2 +- opteryx/managers/schemes/mabel_partitions.py | 2 +- opteryx/managers/serde/physical_plan.py | 21 +++++++++++++ opteryx/operators/__init__.py | 7 +++-- opteryx/operators/aggregate_and_group_node.py | 2 +- opteryx/operators/async_read_node.py | 3 +- opteryx/operators/base_plan_node.py | 3 +- ...riter_node.py => metadata_writer_node.py_} | 0 ...ent_node.py => morsel_defragment_node.py_} | 0 opteryx/operators/read_node.py | 8 ++--- opteryx/operators/sort_node.py | 2 +- opteryx/shared/memory_pool.py | 21 ++++--------- opteryx/utils/file_decoders.py | 25 ++++++++++++++-- opteryx/utils/paths.py | 6 ---- opteryx/utils/series.py | 6 ++-- .../day_03/as_at_00000000/frame.complete | 0 .../day_03/as_at_00000000/tweets-0000.jsonl | 0 .../day_04/as_at_11111111/frame.complete | 0 .../day_04/as_at_11111111/tweets-0001.jsonl | 0 tests/misc/test_memory_pool.py | 9 ++++-- tests/misc/test_memory_pool_async.py | 1 + .../test_shapes_and_errors_battery.py | 30 +++++++++---------- 24 files changed, 98 insertions(+), 64 deletions(-) create mode 100644 opteryx/managers/serde/physical_plan.py rename opteryx/operators/{metadata_writer_node.py => metadata_writer_node.py_} (100%) rename opteryx/operators/{morsel_defragment_node.py => morsel_defragment_node.py_} (100%) rename testdata/partitioned/dated/{year_2020 => year_2024}/month_02/day_03/as_at_00000000/frame.complete (100%) rename testdata/partitioned/dated/{year_2020 => year_2024}/month_02/day_03/as_at_00000000/tweets-0000.jsonl (100%) rename testdata/partitioned/dated/{year_2020 => year_2024}/month_02/day_04/as_at_11111111/frame.complete (100%) rename testdata/partitioned/dated/{year_2020 => year_2024}/month_02/day_04/as_at_11111111/tweets-0001.jsonl (100%) diff --git a/opteryx/connectors/capabilities/cacheable.py b/opteryx/connectors/capabilities/cacheable.py index 1cafd17f5..c30f9b38f 100644 --- a/opteryx/connectors/capabilities/cacheable.py +++ b/opteryx/connectors/capabilities/cacheable.py @@ -142,11 +142,11 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs): if result is not None: statistics.bufferpool_hits += 1 - ref = await pool.commit(result) + ref = await pool.commit(result) # type: ignore while ref is None: await asyncio.sleep(0.1) statistics.stalls_writing_to_read_buffer += 1 - ref = await pool.commit(result) + ref = await pool.commit(result) # type: ignore statistics.bytes_read += len(result) return ref @@ -154,11 +154,11 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs): result = remote_cache.get(key) if result is not None: statistics.remote_cache_hits += 1 - ref = await pool.commit(result) + ref = await pool.commit(result) # type: ignore while ref is None: await asyncio.sleep(0.1) statistics.stalls_writing_to_read_buffer += 1 - ref = await pool.commit(result) + ref = await pool.commit(result) # type: ignore statistics.bytes_read += len(result) return ref @@ -171,7 +171,7 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs): # Write the result to caches if max_evictions: # we set a per-query eviction limit - buffer = await pool.read(result) + buffer = await pool.read(result) # type: ignore if len(buffer) < buffer_pool.max_cacheable_item_size: evicted = buffer_pool.set(key, buffer) remote_cache.set(key, buffer) diff --git a/opteryx/cursor.py b/opteryx/cursor.py index 79e892549..2f1b9ad29 100644 --- a/opteryx/cursor.py +++ b/opteryx/cursor.py @@ -326,8 +326,8 @@ def execute_to_arrow( if limit is not None: result_data = utils.arrow.limit_records(result_data, limit) # type: ignore try: - return pyarrow.concat_tables(result_data, promote_options="none") - except pyarrow.ArrowInvalid as err: # pragma: no cover + return pyarrow.concat_tables(result_data, promote_options="permissive") + except (pyarrow.ArrowInvalid, pyarrow.ArrowTypeError) as err: # pragma: no cover # DEBUG: log (err) if "struct" in str(err): raise InconsistentSchemaError( diff --git a/opteryx/functions/__init__.py b/opteryx/functions/__init__.py index 7bb7c543c..6ad55aa05 100644 --- a/opteryx/functions/__init__.py +++ b/opteryx/functions/__init__.py @@ -282,7 +282,7 @@ def select_values(boolean_arrays, value_arrays): "VARCHAR": cast_varchar, "STRING": cast_varchar, "STR": cast_varchar, - "STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x))), + "STRUCT": _iterate_single_parameter(lambda x: orjson.loads(str(x)) if x is not None else None), "DATE": lambda x: compute.cast(x, pyarrow.date32()), "BLOB": array_encode_utf8, "TRY_TIMESTAMP": try_cast("TIMESTAMP"), diff --git a/opteryx/managers/schemes/mabel_partitions.py b/opteryx/managers/schemes/mabel_partitions.py index cf367cf3e..e76746653 100644 --- a/opteryx/managers/schemes/mabel_partitions.py +++ b/opteryx/managers/schemes/mabel_partitions.py @@ -133,7 +133,7 @@ def _inner(*, timestamp): if as_at is None: continue if is_complete_and_not_invalid(control_blobs, as_at): - data_blobs = (blob for blob in data_blobs if as_at in blob) + data_blobs = (blob for blob in data_blobs if as_at in blob) # type: ignore break data_blobs = [blob for blob in data_blobs if as_at not in blob] as_at = None diff --git a/opteryx/managers/serde/physical_plan.py b/opteryx/managers/serde/physical_plan.py new file mode 100644 index 000000000..d7c55a003 --- /dev/null +++ b/opteryx/managers/serde/physical_plan.py @@ -0,0 +1,21 @@ +""" +{ + "fields": + [ + { + "name" + "type" + } + ], + "steps": { + "id": , + "operator": "name", + "columns": [], + "config": {}, + "requires": [] + } +} + +""" + +from orso.types import OrsoTypes diff --git a/opteryx/operators/__init__.py b/opteryx/operators/__init__.py index 31612bfe5..f6db8c1ef 100644 --- a/opteryx/operators/__init__.py +++ b/opteryx/operators/__init__.py @@ -22,6 +22,7 @@ from .distinct_node import DistinctNode # remove duplicate records from .exit_node import ExitNode from .explain_node import ExplainNode # EXPLAIN queries +from .filter_node import FilterNode # filter unwanted rows from .function_dataset_node import FunctionDatasetNode # Dataset Constructors # from .heap_sort_node import HeapSortNode # Heap @@ -30,12 +31,12 @@ from .join_node import JoinNode from .left_join_node import LeftJoinNode from .limit_node import LimitNode # select the first N records -from .metadata_writer_node import MetadataWriterNode -from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels + +# from .metadata_writer_node import MetadataWriterNode +# from .morsel_defragment_node import MorselDefragmentNode # consolidate small morsels from .noop_node import NoOpNode # No Operation from .projection_node import ProjectionNode # remove unwanted columns including renames from .read_node import ReaderNode -from .filter_node import FilterNode # filter unwanted rows from .set_variable_node import SetVariableNode from .show_columns_node import ShowColumnsNode # column details diff --git a/opteryx/operators/aggregate_and_group_node.py b/opteryx/operators/aggregate_and_group_node.py index 26f11183e..7fad765f0 100644 --- a/opteryx/operators/aggregate_and_group_node.py +++ b/opteryx/operators/aggregate_and_group_node.py @@ -91,7 +91,7 @@ def execute(self) -> Generator[pyarrow.Table, None, None]: # we're pretty sure we're going to use - this will fail for datasets # larger than memory table = pyarrow.concat_tables( - project(morsels.execute(), self.all_identifiers), promote_options="none" + project(morsels.execute(), self.all_identifiers), promote_options="permissive" ) # Allow grouping by functions by evaluating them first diff --git a/opteryx/operators/async_read_node.py b/opteryx/operators/async_read_node.py index 0165cb75f..3e33014b2 100644 --- a/opteryx/operators/async_read_node.py +++ b/opteryx/operators/async_read_node.py @@ -122,7 +122,7 @@ def execute(self) -> Generator: prefix=reader.dataset, ) - data_queue = queue.Queue() + data_queue: queue.Queue = queue.Queue() loop = asyncio.new_event_loop() threading.Thread( @@ -176,6 +176,7 @@ def execute(self) -> Generator: self.statistics.add_message(f"failed to read {blob_name}") self.statistics.failed_reads += 1 print(f"[READER] Cannot read blob {blob_name} due to {err}") + raise err if morsel: self.statistics.columns_read += morsel.num_columns diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index 220c2f94a..a02ea0907 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -19,12 +19,13 @@ from opteryx.models import QueryProperties from opteryx.models import QueryStatistics + class BasePlanNode: _producers = None def to_dict(self) -> dict: raise NotImplementedError() - + @classmethod def from_dict(cls, dic: dict) -> "BasePlanNode": raise NotImplementedError() diff --git a/opteryx/operators/metadata_writer_node.py b/opteryx/operators/metadata_writer_node.py_ similarity index 100% rename from opteryx/operators/metadata_writer_node.py rename to opteryx/operators/metadata_writer_node.py_ diff --git a/opteryx/operators/morsel_defragment_node.py b/opteryx/operators/morsel_defragment_node.py_ similarity index 100% rename from opteryx/operators/morsel_defragment_node.py rename to opteryx/operators/morsel_defragment_node.py_ diff --git a/opteryx/operators/read_node.py b/opteryx/operators/read_node.py index 8469100ff..731e5e32a 100644 --- a/opteryx/operators/read_node.py +++ b/opteryx/operators/read_node.py @@ -82,13 +82,13 @@ def __init__(self, properties: QueryProperties, **parameters): def to_dict(self) -> dict: return { - "identity": f"read-{self.identity}", + "identity": f"read-{self.identity}", "opterator": "ReadNode", "schema": self.columns, "projection": self.columns, "filters": self.predicates, } - + @classmethod def from_dict(cls, dic: dict) -> "BasePlanNode": raise NotImplementedError() @@ -127,9 +127,7 @@ def execute(self) -> Generator: orso_schema.columns = orso_schema_cols arrow_schema = None start_clock = time.monotonic_ns() - reader = self.connector.read_dataset( - columns=self.columns, predicates=self.predicates - ) + reader = self.connector.read_dataset(columns=self.columns, predicates=self.predicates) for morsel in reader: morsel = normalize_morsel(orso_schema, morsel) if arrow_schema: diff --git a/opteryx/operators/sort_node.py b/opteryx/operators/sort_node.py index 783d35f4e..1459242ae 100644 --- a/opteryx/operators/sort_node.py +++ b/opteryx/operators/sort_node.py @@ -98,7 +98,7 @@ def execute(self) -> Generator: direction, ) ) - except ColumnNotFoundError as cnfe: + except ColumnNotFoundError as cnfe: # pragma: no cover raise ColumnNotFoundError( f"`ORDER BY` must reference columns as they appear in the `SELECT` clause. {cnfe}" ) diff --git a/opteryx/shared/memory_pool.py b/opteryx/shared/memory_pool.py index 6503719ac..1aa3e2c28 100644 --- a/opteryx/shared/memory_pool.py +++ b/opteryx/shared/memory_pool.py @@ -40,7 +40,7 @@ def __init__( size: int, name: str = "Memory Pool", ): - if size <= 0: + if size <= 0: # pragma: no cover raise ValueError("MemoryPool size must be a positive integer") self.lock = Lock() self.pool = bytearray(size) @@ -188,12 +188,12 @@ def read(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, memoryview]: We use this approach because locks are expensive and memory pools are likely to be read heavy. """ - if ref_id not in self.used_segments: + if ref_id not in self.used_segments: # pragma: no cover raise ValueError("Invalid reference ID.") self.reads += 1 segment = self.used_segments[ref_id] view = memoryview(self.pool)[segment.start : segment.start + segment.length] - if segment != self.used_segments[ref_id]: + if segment != self.used_segments[ref_id]: # pragma: no cover with self.lock: if ref_id not in self.used_segments: raise ValueError("Invalid reference ID.") @@ -210,7 +210,7 @@ def release(self, ref_id: int): """ self.releases += 1 with self.lock: - if ref_id not in self.used_segments: + if ref_id not in self.used_segments: # pragma: no cover raise ValueError(f"Invalid reference ID - {ref_id}.") segment = self.used_segments.pop(ref_id) self.free_segments.append(segment) @@ -222,7 +222,7 @@ def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, with self.lock: self.reads += 1 self.releases += 1 - if ref_id not in self.used_segments: + if ref_id not in self.used_segments: # pragma: no cover raise ValueError("Invalid reference ID.") self.read_locks += 1 segment = self.used_segments.pop(ref_id) @@ -233,7 +233,7 @@ def read_and_release(self, ref_id: int, zero_copy: bool = True) -> Union[bytes, return bytes(view) @property - def stats(self) -> dict: + def stats(self) -> dict: # pragma: no cover return { "free_segments": len(self.free_segments), "used_segments": len(self.used_segments), @@ -274,12 +274,3 @@ async def read(self, ref_id: int) -> bytes: async def release(self, ref_id: int): async with self.lock: self.pool.release(ref_id) - - async def can_commit(self, data: bytes) -> bool: - async with self.lock: - return self.pool.can_commit(data) - - @property - async def stats(self): - async with self.lock: - return self.pool.stats diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py index d22b61687..f002915a1 100644 --- a/opteryx/utils/file_decoders.py +++ b/opteryx/utils/file_decoders.py @@ -26,6 +26,8 @@ from orso.tools import random_string from opteryx.exceptions import UnsupportedFileTypeError +from opteryx.managers.expression import NodeType +from opteryx.managers.expression import get_all_nodes_of_type from opteryx.utils.arrow import post_read_projector from opteryx.utils.memory_view_stream import MemoryViewStream @@ -128,8 +130,10 @@ def parquet_decoder(buffer, projection: List = None, selection=None, just_schema from opteryx.connectors.capabilities import PredicatePushable + columns_in_filters = {c.value for c in get_all_nodes_of_type(selection, (NodeType.IDENTIFIER,))} + # Convert the selection to DNF format if applicable - _select, selection = PredicatePushable.to_dnf(selection) if selection else (None, None) + dnf_filter, selection = PredicatePushable.to_dnf(selection) if selection else (None, None) selected_columns = None stream: BinaryIO = None @@ -154,6 +158,23 @@ def parquet_decoder(buffer, projection: List = None, selection=None, just_schema # If no columns are selected, set to None if not selected_columns: selected_columns = None + if not columns_in_filters.issubset(schema_columns_set): + if selected_columns is None: + selected_columns = list(schema_columns_set) + fields = [pyarrow.field(name, pyarrow.string()) for name in selected_columns] + schema = pyarrow.schema(fields) + + # Create an empty table with the schema + empty_table = pyarrow.Table.from_arrays( + [pyarrow.array([], type=schema.field(i).type) for i in range(len(fields))], + schema=schema, + ) + + return ( + parquet_file.metadata.num_rows, + parquet_file.metadata.num_columns, + empty_table, + ) # Special handling for projection of [] (usually means COUNT(*)) if projection == []: @@ -161,7 +182,7 @@ def parquet_decoder(buffer, projection: List = None, selection=None, just_schema # Read the parquet table with the optimized column list and selection filters table = parquet.read_table( - stream, columns=selected_columns, pre_buffer=False, filters=_select, use_threads=False + stream, columns=selected_columns, pre_buffer=False, filters=dnf_filter, use_threads=False ) if selection: table = filter_records(selection, table) diff --git a/opteryx/utils/paths.py b/opteryx/utils/paths.py index fede2c6f7..92c13dc44 100644 --- a/opteryx/utils/paths.py +++ b/opteryx/utils/paths.py @@ -19,8 +19,6 @@ def get_parts(path_string: str): - if not path_string: - raise ValueError("get_parts: path_string must have a value") # Validate against path traversal and home directory references if ".." in path_string or path_string.startswith("~"): @@ -48,7 +46,3 @@ def get_parts(path_string: str): parts_path = OS_SEP.join(parts) return bucket, parts_path, file_name, suffix - - -def is_file(path): - return os.path.isfile(path) diff --git a/opteryx/utils/series.py b/opteryx/utils/series.py index 09b23a620..f1fd99b74 100644 --- a/opteryx/utils/series.py +++ b/opteryx/utils/series.py @@ -28,7 +28,7 @@ def generate_series(*args): OrsoTypes.INTEGER, OrsoTypes.DOUBLE, ): - if arg_len not in (1, 2, 3): + if arg_len not in (1, 2, 3): # pragma: no cover raise SqlError( "generate_series for numbers takes 1 (stop), 2 (start, stop) or 3 (start, stop, interval) parameters." ) @@ -36,7 +36,7 @@ def generate_series(*args): # if the params are timestamps, we create time intervals if args[0].type in (OrsoTypes.DATE, OrsoTypes.TIMESTAMP): - if arg_len != 3: + if arg_len != 3: # pragma: no cover raise SqlError("generate_series for dates needs start, end, and interval parameters") return dates.date_range(*arg_vals) @@ -73,7 +73,7 @@ def numeric_range(*args) -> numpy.ndarray: start, stop = args elif len(args) == 3: start, stop, step = args - else: + else: # pragma: no cover raise ValueError("Invalid number of arguments. Expected 1, 2, or 3: start, stop [, step].") # Determine dtype diff --git a/testdata/partitioned/dated/year_2020/month_02/day_03/as_at_00000000/frame.complete b/testdata/partitioned/dated/year_2024/month_02/day_03/as_at_00000000/frame.complete similarity index 100% rename from testdata/partitioned/dated/year_2020/month_02/day_03/as_at_00000000/frame.complete rename to testdata/partitioned/dated/year_2024/month_02/day_03/as_at_00000000/frame.complete diff --git a/testdata/partitioned/dated/year_2020/month_02/day_03/as_at_00000000/tweets-0000.jsonl b/testdata/partitioned/dated/year_2024/month_02/day_03/as_at_00000000/tweets-0000.jsonl similarity index 100% rename from testdata/partitioned/dated/year_2020/month_02/day_03/as_at_00000000/tweets-0000.jsonl rename to testdata/partitioned/dated/year_2024/month_02/day_03/as_at_00000000/tweets-0000.jsonl diff --git a/testdata/partitioned/dated/year_2020/month_02/day_04/as_at_11111111/frame.complete b/testdata/partitioned/dated/year_2024/month_02/day_04/as_at_11111111/frame.complete similarity index 100% rename from testdata/partitioned/dated/year_2020/month_02/day_04/as_at_11111111/frame.complete rename to testdata/partitioned/dated/year_2024/month_02/day_04/as_at_11111111/frame.complete diff --git a/testdata/partitioned/dated/year_2020/month_02/day_04/as_at_11111111/tweets-0001.jsonl b/testdata/partitioned/dated/year_2024/month_02/day_04/as_at_11111111/tweets-0001.jsonl similarity index 100% rename from testdata/partitioned/dated/year_2020/month_02/day_04/as_at_11111111/tweets-0001.jsonl rename to testdata/partitioned/dated/year_2024/month_02/day_04/as_at_11111111/tweets-0001.jsonl diff --git a/tests/misc/test_memory_pool.py b/tests/misc/test_memory_pool.py index 26fa6e6fd..391e685a7 100644 --- a/tests/misc/test_memory_pool.py +++ b/tests/misc/test_memory_pool.py @@ -5,6 +5,8 @@ import time import threading +os.environ["OPTERYX_DEBUG"] = "1" + sys.path.insert(1, os.path.join(sys.path[0], "../..")) from opteryx.shared import MemoryPool @@ -126,7 +128,7 @@ def test_repeated_commits_and_releases(): mp._level1_compaction() assert ( - mp.free_segments[0]["length"] == mp.size + mp.free_segments[0].length == mp.size ), f"Memory leak detected after repeated commits and releases. {mp.free_segments[0]['length']} != {mp.size}\n{mp.free_segments}" @@ -156,7 +158,10 @@ def test_stress_with_random_sized_data(): mp.read_and_release(ref) refs.discard(ref) # Ensure that the pool or leaking - assert mp.available_space() >= mp.size, "Memory fragmentation or leak detected." + mp._level1_compaction() + assert ( + mp.available_space() == mp.size + ), f"Memory fragmentation or leak detected.\n{mp.available_space()} != {mp.size}" mp._level1_compaction() assert len(mp.free_segments) == 1 diff --git a/tests/misc/test_memory_pool_async.py b/tests/misc/test_memory_pool_async.py index 7b8c3f2d6..bf1ba54b9 100644 --- a/tests/misc/test_memory_pool_async.py +++ b/tests/misc/test_memory_pool_async.py @@ -60,6 +60,7 @@ async def remove_random_data(): await mp.release(ref) # Ensure all memory is accounted for + bmp._level1_compaction() assert bmp.available_space() == bmp.size, "Memory leak or fragmentation detected." diff --git a/tests/sql_battery/test_shapes_and_errors_battery.py b/tests/sql_battery/test_shapes_and_errors_battery.py index 30fcac407..0f3606daa 100644 --- a/tests/sql_battery/test_shapes_and_errors_battery.py +++ b/tests/sql_battery/test_shapes_and_errors_battery.py @@ -657,23 +657,23 @@ ("SELECT * FROM 'testdata/flat/../flat/formats/arrow/tweets.arrow'", None, None, DatasetNotFoundError), # don't allow traversal ("SELECT * FROM testdata.partitioned.dated", None, None, EmptyDatasetError), # it's there, but no partitions for today - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03' WITH (NO_CACHE)", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03'", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-04'", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2020-02-01' AND '2020-02-28'", 50, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03' OFFSET 1", 24, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2020-02-01' AND '2020-02-28' OFFSET 1", 49, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03' WITH (NO_CACHE)", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03'", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-04'", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2024-02-01' AND '2024-02-28'", 50, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03' OFFSET 1", 24, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2024-02-01' AND '2024-02-28' OFFSET 1", 49, 8, None), ("SELECT * FROM $satellites FOR YESTERDAY ORDER BY planetId OFFSET 10", 167, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03 00:00' WITH (NO_CACHE)", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03 12:00'", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-04 00:00'", 25, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2020-02-01 00:00' AND '2020-02-28 00:00'", 50, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR '2020-02-03 00:00' OFFSET 1", 24, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2020-02-01 00:00' AND '2020-02-28 00:00' OFFSET 1", 49, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03 00:00' WITH (NO_CACHE)", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03 12:00'", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-04 00:00'", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2024-02-01 00:00' AND '2024-02-28 00:00'", 50, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR '2024-02-03 00:00' OFFSET 1", 24, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES BETWEEN '2024-02-01 00:00' AND '2024-02-28 00:00' OFFSET 1", 49, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES SINCE '2020-02-01'", 50, 8, None), - ("SELECT * FROM testdata.partitioned.dated FOR DATES SINCE '2020-02-04 00:00'", 25, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES SINCE '2024-02-01'", 50, 8, None), + ("SELECT * FROM testdata.partitioned.dated FOR DATES SINCE '2024-02-04 00:00'", 25, 8, None), ("SELECT * FROM testdata.partitioned.mixed FOR '2020-02-03'", None, None, UnsupportedSegementationError), ("SELECT * FROM $planets FOR '1730-01-01'", 6, 20, None), @@ -784,7 +784,7 @@ ("SHOW EXTENDED COLUMNS FROM $planets", 20, 12, None), ("SHOW EXTENDED COLUMNS FROM $astronauts", 19, 12, None), ("SHOW COLUMNS FROM $satellites LIKE '%d'", 2, 4, UnsupportedSyntaxError), - ("SHOW COLUMNS FROM testdata.partitioned.dated FOR '2020-02-03'", 8, 4, None), + ("SHOW COLUMNS FROM testdata.partitioned.dated FOR '2024-02-03'", 8, 4, None), ("SELECT * FROM $satellites CROSS JOIN $astronauts", 63189, 27, None), ("SELECT * FROM $satellites WITH (NO_CACHE) CROSS JOIN $astronauts WITH (NO_CACHE)", 63189, 27, None), From aec3011623dc3feb82cbf3c55a2724b09274cd79 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sat, 4 May 2024 22:21:41 +0000 Subject: [PATCH 3/7] Opteryx Version 0.14.2-beta.469 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index e881b9a7e..b57a4b75e 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 466 +__build__ = 469 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 1800ad175bd567259af57f08a5d1b23ee3f08a3b Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 5 May 2024 00:23:46 +0100 Subject: [PATCH 4/7] HOUSEKEEPING --- .../capabilities/predicate_pushable.py | 2 +- opteryx/connectors/cql_connector.py | 22 +-- opteryx/models/node.py | 150 ------------------ opteryx/operators/base_plan_node.py | 6 +- .../strategies/boolean_simplication.py | 2 +- opteryx/third_party/query_builder/builder.py | 6 +- opteryx/utils/arrow.py | 12 -- opteryx/utils/dates.py | 4 +- opteryx/utils/file_decoders.py | 2 +- opteryx/utils/memory_view_stream.py | 15 +- 10 files changed, 25 insertions(+), 196 deletions(-) delete mode 100644 opteryx/models/node.py diff --git a/opteryx/connectors/capabilities/predicate_pushable.py b/opteryx/connectors/capabilities/predicate_pushable.py index 2fc757b59..ef88d5775 100644 --- a/opteryx/connectors/capabilities/predicate_pushable.py +++ b/opteryx/connectors/capabilities/predicate_pushable.py @@ -72,7 +72,7 @@ def to_dnf(root): def _predicate_to_dnf(root): # Reduce look-ahead effort by using Exceptions to control flow - if root.node_type == NodeType.AND: + if root.node_type == NodeType.AND: # pragma: no cover left = _predicate_to_dnf(root.left) right = _predicate_to_dnf(root.right) if not isinstance(left, list): diff --git a/opteryx/connectors/cql_connector.py b/opteryx/connectors/cql_connector.py index eed574aa8..e9d7c7543 100644 --- a/opteryx/connectors/cql_connector.py +++ b/opteryx/connectors/cql_connector.py @@ -133,24 +133,18 @@ def read_dataset( # type:ignore # Update SQL if we've pushed predicates parameters: list = [] for predicate in predicates: - if predicate.node_type == NodeType.UNARY_OPERATOR: - operator = self.OPS_XLAT[predicate.value] - operand, parameters = _handle_operand(predicate.centre, parameters) - operator = operator.replace(":operand", operand) - query_builder.WHERE(operator) - else: - left_operand = predicate.left - right_operand = predicate.right - operator = self.OPS_XLAT[predicate.value] + left_operand = predicate.left + right_operand = predicate.right + operator = self.OPS_XLAT[predicate.value] - left_value, parameters = _handle_operand(left_operand, parameters) - right_value, parameters = _handle_operand(right_operand, parameters) + left_value, parameters = _handle_operand(left_operand, parameters) + right_value, parameters = _handle_operand(right_operand, parameters) - operator = operator.replace(":left", left_value) - operator = operator.replace(":right", right_value) + operator = operator.replace(":left", left_value) + operator = operator.replace(":right", right_value) - query_builder.WHERE(operator) + query_builder.WHERE(operator) session = self.cluster.connect() # DEBUG: log ("READ DATASET\n", str(query_builder)) diff --git a/opteryx/models/node.py b/opteryx/models/node.py deleted file mode 100644 index 4df474c2e..000000000 --- a/opteryx/models/node.py +++ /dev/null @@ -1,150 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Node Module - -This module contains the Node class, which provides an implementation for dynamic attribute management. - -Noteworthy features and design choices: - -1. Dynamic Attributes: The Node class allows you to set and get attributes dynamically, storing them in an internal dictionary. -2. Attribute Validation: Attributes starting with an underscore are not allowed. -3. Property Access: The class provides a `properties` method that exposes the internal attributes for external use. -4. Attribute Defaults: When attempting to access an attribute that doesn't exist, the `__getattr__` method will return None. -5. Deep Copy: The `copy` method allows for deep copying of the Node object, preserving the structure and values of all internal attributes. -6. JSON Representation: The `__str__` method returns a JSON representation of the internal attributes, which can be helpful for debugging or serialization. - -""" - -import copy -from typing import Any -from typing import Dict -from typing import Union - - -class Node: - - __slots__ = ("_internal", "node_type") - - def __init__(self, node_type: Union[str, int, None] = None, **kwargs: Any): - """ - Initialize a Node with attributes. - - Parameters: - node_type: str, optional - The type of the node. - **kwargs: Any - Dynamic attributes for the node. - """ - object.__setattr__( - self, "_internal", kwargs - ) # Directly set _internal using the base method - self.node_type = node_type - - @property - def properties(self) -> Dict[str, Any]: - """ - Get the internal properties of the Node. - - Returns: - Dict[str, Any]: The internal properties. - """ - return self._internal - - def __getattr__(self, name: str) -> Any: - """ - Retrieve attribute from the internal dictionary or the _identity. - - Parameters: - name: str - The name of the attribute to retrieve. - - Returns: - Any: The attribute value. - """ - if name in self._internal: - return self._internal[name] - return None - - def __setattr__(self, name: str, value: Any) -> None: - """ - Set attribute in the internal dictionary. - - Parameters: - name: str - The name of the attribute. - value: Any - The value to set. - """ - if name == "_internal": - self._internal = value - else: - if value is None: - self._internal.pop(name, None) - else: - self._internal[name] = value - - def __str__(self) -> str: - """ - Return a string representation of the Node using JSON serialization. - - Returns: - str: The JSON string representation. - """ - import orjson - - return orjson.dumps(self._internal, default=str).decode() - - def __repr__(self) -> str: - """ - Provide a detailed representation for debugging. - - Returns: - str: A string representation useful for debugging. - """ - node_type = str(self.node_type) - if node_type.startswith("LogicalPlanStepType."): - node_type = node_type[20:] - return f"" - - def copy(self) -> "Node": - """ - Create an independent deep copy of the node. - - Returns: - Node: The new, independent deep copy. - """ - - def _inner_copy(obj: Any) -> Any: - """ - Create an independent inner copy of the given object. - - Parameters: - obj: Any - The object to be deep copied. - - Returns: - Any: The new, independent deep copy. - """ - obj_type = type(obj) - if obj_type in (list, tuple, set): - return obj_type(_inner_copy(item) for item in obj) - if obj_type == dict: - return {key: _inner_copy(value) for key, value in obj.items()} - if hasattr(obj, "copy"): - return obj.copy() - if hasattr(obj, "deepcopy"): - return copy.deepcopy(obj) - return obj - - new_node = Node(**{key: _inner_copy(value) for key, value in self._internal.items()}) - return new_node diff --git a/opteryx/operators/base_plan_node.py b/opteryx/operators/base_plan_node.py index a02ea0907..a2977a56e 100644 --- a/opteryx/operators/base_plan_node.py +++ b/opteryx/operators/base_plan_node.py @@ -23,11 +23,11 @@ class BasePlanNode: _producers = None - def to_dict(self) -> dict: + def to_dict(self) -> dict: # pragma: no cover raise NotImplementedError() @classmethod - def from_dict(cls, dic: dict) -> "BasePlanNode": + def from_dict(cls, dic: dict) -> "BasePlanNode": # pragma: no cover raise NotImplementedError() def __init__(self, properties: QueryProperties, **parameters): @@ -67,5 +67,5 @@ def config(self): # pragma: no cover """ return "" - def execute(self) -> Generator[pyarrow.Table, None, None]: + def execute(self) -> Generator[pyarrow.Table, None, None]: # pragma: no cover pass diff --git a/opteryx/planner/cost_based_optimizer/strategies/boolean_simplication.py b/opteryx/planner/cost_based_optimizer/strategies/boolean_simplication.py index 57f089c26..73b79a676 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/boolean_simplication.py +++ b/opteryx/planner/cost_based_optimizer/strategies/boolean_simplication.py @@ -17,7 +17,7 @@ Goal: Preposition for following actions """ from opteryx.managers.expression import NodeType -from opteryx.models.node import Node +from opteryx.models import Node from opteryx.planner.logical_planner import LogicalPlan from opteryx.planner.logical_planner import LogicalPlanNode from opteryx.planner.logical_planner import LogicalPlanStepType diff --git a/opteryx/third_party/query_builder/builder.py b/opteryx/third_party/query_builder/builder.py index 0219a2363..68cd6b333 100644 --- a/opteryx/third_party/query_builder/builder.py +++ b/opteryx/third_party/query_builder/builder.py @@ -45,7 +45,7 @@ def add(self, keyword, *args): target = self.data[keyword] if flag: - if target.flag: + if target.flag: # pragma: no cover raise ValueError(f"{keyword} already has flag: {flag!r}") target.flag = flag @@ -69,7 +69,7 @@ def _resolve_fakes(self, keyword): def _resolve_flags(self, keyword): prefix, _, flag = keyword.partition(" ") if prefix in self.flag_keywords: - if flag and flag not in self.flag_keywords[prefix]: + if flag and flag not in self.flag_keywords[prefix]: # pragma: no cover raise ValueError(f"invalid flag for {prefix}: {flag!r}") return prefix, flag return keyword, "" @@ -134,7 +134,7 @@ def from_arg(cls, arg, **kwargs): alias, value = "", arg elif len(arg) == 2: alias, value = arg - else: + else: # pragma: no cover raise ValueError(f"invalid arg: {arg!r}") return cls(_clean_up(value), _clean_up(alias), **kwargs) diff --git a/opteryx/utils/arrow.py b/opteryx/utils/arrow.py index 0ea3b7740..cf482641f 100644 --- a/opteryx/utils/arrow.py +++ b/opteryx/utils/arrow.py @@ -17,7 +17,6 @@ from typing import Iterator from typing import Optional -import orjson import pyarrow INTERNAL_BATCH_SIZE = 500 @@ -99,17 +98,6 @@ def post_read_projector(table: pyarrow.Table, columns: list) -> pyarrow.Table: return table.rename_columns(column_names) -def update_metadata(table: pyarrow.Table, metadata: dict) -> pyarrow.Table: - table_metadata = table.schema.metadata or {} - for k, v in metadata.items(): - table_metadata[k] = orjson.dumps(v) - - schema = pyarrow.schema( - [table.field(name) for name in table.schema.names], metadata=table_metadata - ) - return table.cast(schema) - - def align_tables(source_table, append_table, source_indices, append_indices): # If either source_indices or append_indices is empty, return the source_table taken with source_indices immediately if len(source_indices) == 0 or len(append_indices) == 0: diff --git a/opteryx/utils/dates.py b/opteryx/utils/dates.py index 1506ebdff..cb784e5d4 100644 --- a/opteryx/utils/dates.py +++ b/opteryx/utils/dates.py @@ -91,7 +91,7 @@ def date_range(start_date, end_date, interval: str): raise ValueError("Cannot create an series with the provided start and end dates") # if the dates are the same, return that date - if start_date == end_date: # pragme: no cover + if start_date == end_date: # pragma: no cover yield start_date return @@ -222,6 +222,6 @@ def date_trunc(truncate_to, date_value): return datetime.datetime(date_value.year, date_value.month, date_value.day, date_value.hour, date_value.minute, tzinfo=date_value.tzinfo) elif truncate_to == "second": return datetime.datetime(date_value.year, date_value.month, date_value.day, date_value.hour, date_value.minute, date_value.second, tzinfo=date_value.tzinfo) - else: + else: # pragma: no cover raise ValueError("Invalid unit: {}".format(truncate_to)) # fmt:on diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py index f002915a1..b9965d6c5 100644 --- a/opteryx/utils/file_decoders.py +++ b/opteryx/utils/file_decoders.py @@ -312,7 +312,7 @@ def avro_decoder(buffer, projection: List = None, selection=None, just_schema: b try: from avro.datafile import DataFileReader from avro.io import DatumReader - except ImportError: # pragma: no-cover + except ImportError: # pragma: no cover raise Exception("`avro` is missing, please install or include in your `requirements.txt`.") stream: BinaryIO = None diff --git a/opteryx/utils/memory_view_stream.py b/opteryx/utils/memory_view_stream.py index a1d9ac662..d34b32f3c 100644 --- a/opteryx/utils/memory_view_stream.py +++ b/opteryx/utils/memory_view_stream.py @@ -85,26 +85,23 @@ def __next__(self) -> bytes: def fileno(self) -> int: return -1 - def flush(self) -> None: + def flush(self) -> None: # pragma: no cover raise io.UnsupportedOperation() def isatty(self) -> bool: return False - def readline(self, limit: int = -1): + def readline(self, limit: int = -1): # pragma: no cover raise io.UnsupportedOperation() - def readlines(self, hint: int = -1) -> list: + def readlines(self, hint: int = -1) -> list: # pragma: no cover raise io.UnsupportedOperation() - def truncate(self): - + def truncate(self): # pragma: no cover raise io.UnsupportedOperation() - def write(self): - + def write(self): # pragma: no cover raise io.UnsupportedOperation() - def writelines(self): - + def writelines(self): # pragma: no cover raise io.UnsupportedOperation() From 417c4d1c64748bd10fc7c409fe3c69940833c377 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sat, 4 May 2024 23:24:25 +0000 Subject: [PATCH 5/7] Opteryx Version 0.14.2-beta.470 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index b57a4b75e..0acf3f560 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 469 +__build__ = 470 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 6229307868cdffdceccc3933bb9e2c8775181c89 Mon Sep 17 00:00:00 2001 From: joocer Date: Sun, 5 May 2024 00:38:11 +0100 Subject: [PATCH 6/7] HOUSEKEEPING --- opteryx/connectors/gcp_cloudstorage_connector.py | 2 +- opteryx/planner/temporary_physical_planner.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/opteryx/connectors/gcp_cloudstorage_connector.py b/opteryx/connectors/gcp_cloudstorage_connector.py index e8337cdc0..2754e8e40 100644 --- a/opteryx/connectors/gcp_cloudstorage_connector.py +++ b/opteryx/connectors/gcp_cloudstorage_connector.py @@ -201,7 +201,7 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]: headers = {"Authorization": f"Bearer {self.access_token}"} params = None - blob_names = [] + blob_names: List[str] = [] while True: response = self.session.get(url, headers=headers, timeout=30, params=params) diff --git a/opteryx/planner/temporary_physical_planner.py b/opteryx/planner/temporary_physical_planner.py index 149fd7ae3..fa52c04a3 100644 --- a/opteryx/planner/temporary_physical_planner.py +++ b/opteryx/planner/temporary_physical_planner.py @@ -37,8 +37,8 @@ def create_physical_plan(logical_plan, query_properties): node = operators.AggregateNode(query_properties, aggregates=node_config["aggregates"]) elif node_type == LogicalPlanStepType.AggregateAndGroup: node = operators.AggregateAndGroupNode(query_properties, groups=node_config["groups"], aggregates=node_config["aggregates"], projection=node_config["projection"]) - elif node_type == LogicalPlanStepType.Defragment: - node = operators.MorselDefragmentNode(query_properties, **node_config) +# elif node_type == LogicalPlanStepType.Defragment: +# node = operators.MorselDefragmentNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Distinct: node = operators.DistinctNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Exit: @@ -87,9 +87,9 @@ def create_physical_plan(logical_plan, query_properties): node = operators.NoOpNode(query_properties, **node_config) elif node_type == LogicalPlanStepType.Union: node = operators.UnionNode(query_properties, **node_config) - elif node_type == LogicalPlanStepType.MetadataWriter: - node = operators.MetadataWriterNode(query_properties, **node_config) - else: +# elif node_type == LogicalPlanStepType.MetadataWriter: +# node = operators.MetadataWriterNode(query_properties, **node_config) + else: # pragma: no cover raise Exception(f"something unexpected happed - {node_type.name}") # fmt: on plan.add_node(nid, node) From bd129a1b4a813a6992df4b2d0be870cf12b89923 Mon Sep 17 00:00:00 2001 From: XB500 Date: Sat, 4 May 2024 23:38:38 +0000 Subject: [PATCH 7/7] Opteryx Version 0.14.2-beta.471 --- opteryx/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 0acf3f560..843f9e0dd 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 470 +__build__ = 471 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.