Skip to content

Commit

Permalink
Merge pull request #2214 from mabel-dev/#2213
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Jan 8, 2025
2 parents e1815ff + a7057dd commit 8fc8926
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 19 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 966
__build__ = 967

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
51 changes: 51 additions & 0 deletions opteryx/compiled/structures/buffers.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# distutils: language = c++
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=False
# cython: boundscheck=False

from libc.stdint cimport int64_t

import numpy
cimport numpy as cnp

cdef class IntBuffer:
"""
A fast integer buffer using Cython-managed memory.
"""
cdef public int64_t[:] _buffer
cdef public size_t size
cdef public size_t capacity

def __cinit__(self, size_hint: int = 1024):
self.capacity = size_hint
self._buffer = numpy.zeros(self.capacity, dtype=numpy.int64)
self.size = 0

cpdef void append(self, int64_t value):
""" Append an integer to the buffer. """
cdef cnp.ndarray[int64_t, ndim=1] new_buffer
if self.size == self.capacity:
self.capacity *= 2
new_buffer = numpy.zeros(self.capacity, dtype=numpy.int64)
new_buffer[:self.size] = self._buffer
self._buffer = new_buffer
self._buffer[self.size] = value
self.size += 1

cpdef void extend(self, iterable):
""" Extend the buffer with an iterable of integers. """
cdef int64_t i
for i in range(len(iterable)):
self.append(iterable[i])

cpdef cnp.ndarray[int64_t, ndim=1] to_numpy(self):
""" Convert the buffer to a NumPy array without copying. """
return numpy.asarray(self._buffer[:self.size])

cpdef buffer(self):
""" Convert the buffer to a NumPy array without copying. """
return self._buffer[:self.size]
14 changes: 8 additions & 6 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from opteryx import EOS
from opteryx.compiled.joins.inner_join import abs_hash_join_map
from opteryx.compiled.structures import hash_join_map
from opteryx.compiled.structures.buffers import IntBuffer
from opteryx.models import QueryProperties
from opteryx.utils.arrow import align_tables

Expand All @@ -52,8 +53,8 @@ def inner_join_with_preprocessed_left_side(left_relation, right_relation, join_c
Returns:
A tuple containing lists of matching row indices from the left and right relations.
"""
left_indexes = []
right_indexes = []
left_indexes = IntBuffer()
right_indexes = IntBuffer()

right_hash = hash_join_map(right_relation, join_columns)

Expand All @@ -62,11 +63,12 @@ def inner_join_with_preprocessed_left_side(left_relation, right_relation, join_c
if left_rows is None:
continue
for l in left_rows:
for r in right_rows:
left_indexes.append(l)
right_indexes.append(r)
left_indexes.extend([l] * len(right_rows))
right_indexes.extend(right_rows)

return align_tables(right_relation, left_relation, right_indexes, left_indexes)
return align_tables(
right_relation, left_relation, right_indexes.to_numpy(), left_indexes.to_numpy()
)


class InnerJoinNode(JoinNode):
Expand Down
21 changes: 9 additions & 12 deletions opteryx/operators/outer_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
popular SEMI and ANTI joins we leave to PyArrow for now.
"""

from collections import deque
from typing import List

import pyarrow

from opteryx import EOS
from opteryx.compiled.structures import HashTable
from opteryx.compiled.structures.buffers import IntBuffer
from opteryx.models import QueryProperties
from opteryx.third_party.abseil.containers import FlatHashMap
from opteryx.utils.arrow import align_tables
Expand Down Expand Up @@ -50,8 +50,8 @@ def left_join(left_relation, right_relation, left_columns: List[str], right_colu
from opteryx.compiled.joins.inner_join import abs_hash_join_map
from opteryx.compiled.structures.hash_table import hash_join_map

left_indexes: deque = deque()
right_indexes: deque = deque()
left_indexes = IntBuffer()
right_indexes = []

if len(set(left_columns) & set(right_relation.column_names)) > 0:
left_columns, right_columns = right_columns, left_columns
Expand All @@ -63,27 +63,24 @@ def left_join(left_relation, right_relation, left_columns: List[str], right_colu
right_rows = right_hash.get(hash_value)
if right_rows:
for l in left_rows:
for r in right_rows:
left_indexes.append(l)
right_indexes.append(r)
left_indexes.extend([l] * len(right_rows))
right_indexes.extend(right_rows)
else:
for l in left_rows:
left_indexes.append(l)
right_indexes.append(None)

if len(left_indexes) > 50_000:
if left_indexes.size > 50_000:
table = align_tables(
right_relation, left_relation, list(right_indexes), list(left_indexes)
right_relation, left_relation, right_indexes, left_indexes.to_numpy()
)
yield table
left_indexes.clear()
left_indexes.size = 0
right_indexes.clear()

# this may return an empty table each time - fix later
table = align_tables(right_relation, left_relation, list(right_indexes), list(left_indexes))
table = align_tables(right_relation, left_relation, right_indexes, left_indexes.to_numpy())
yield table
left_indexes.clear()
right_indexes.clear()


def full_join(left_relation, right_relation, left_columns: List[str], right_columns: List[str]):
Expand Down
7 changes: 7 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
language="c++",
extra_compile_args=COMPILE_FLAGS + ["-std=c++17"],
),
Extension(
name="opteryx.compiled.structures.buffers",
sources=["opteryx/compiled/structures/buffers.pyx"],
include_dirs=include_dirs,
language="c++",
extra_compile_args=COMPILE_FLAGS + ["-std=c++17"],
),
Extension(
name="opteryx.compiled.structures.memory_pool",
sources=["opteryx/compiled/structures/memory_pool.pyx"],
Expand Down

0 comments on commit 8fc8926

Please sign in to comment.