Skip to content

Commit

Permalink
Merge pull request #2120 from mabel-dev/#2100/4
Browse files Browse the repository at this point in the history
#2100
  • Loading branch information
joocer authored Dec 5, 2024
2 parents 59b7fc1 + 2d00e92 commit c9cec7b
Show file tree
Hide file tree
Showing 31 changed files with 164 additions and 100 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ Download | https://pypi.org/project/opteryx/

## What is Opteryx?

Opteryx champions the SQL-on-everything approach, streamlining cross-platform data analytics by federating SQL queries across diverse data sources, including database systems like Postgres and datalake file formats like Parquet. The goal is to enhance your data analytics process by offering a unified way to access data from across your organization.

Opteryx is a Python library that combines elements of in-process database engines like SQLite and DuckDB with federative features found in systems like Presto and Trino. The result is a versatile tool for querying data across multiple data sources in a seamless fashion.
Opteryx is a Python library enabling SQL queries across diverse data sources like Postgres, Parquet, and MongoDB. Opteryx champions the SQL-on-everything approach, streamlining cross-platform data analytics by federating SQL queries across diverse data sources. The goal is to enhance your data analytics process by offering a unified way to access data from across your organization. Opteryx combines the simplicity of SQLite and DuckDB with federated query capabilities found in Presto and Trino.

Opteryx offers the following features:

Expand Down Expand Up @@ -289,7 +287,6 @@ You can also try Opteryx right now using our [interactive labs](https://github.c
[![Discord](https://img.shields.io/badge/discuss%20on-discord-5865F2.svg?logo=discord)](https://discord.gg/qpv2tr989x)
[![Medium](https://img.shields.io/badge/Read%20on-Medium-5865F2.svg?logo=medium)](https://medium.com/opteryx)


**Get Involved**

🌟 **Star this repo** to show your support and help others discover Opteryx.
Expand All @@ -299,14 +296,15 @@ You can also try Opteryx right now using our [interactive labs](https://github.c
❤️ We welcome [sponsorships](https://github.com/sponsors/mabel-dev) of any size. Every contribution helps us make Opteryx even better!

We’re excited to have you join our journey. Let us know how we can help!

## Security

We take security seriously. If you find any weaknesses please review our [Security Policy](https://github.com/mabel-dev/opteryx/blob/main/SECURITY.md) let us know through our [reporting process](https://github.com/mabel-dev/opteryx/security/advisories/new).

[![Static Analysis](https://github.com/mabel-dev/opteryx/actions/workflows/static_analysis.yaml/badge.svg)](https://github.com/mabel-dev/opteryx/actions/workflows/static_analysis.yml)
[![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=mabel-dev_opteryx&metric=vulnerabilities)](https://sonarcloud.io/summary/new_code?id=mabel-dev_opteryx)
[![Security Rating](https://sonarcloud.io/api/project_badges/measure?project=mabel-dev_opteryx&metric=security_rating)](https://sonarcloud.io/summary/new_code?id=mabel-dev_opteryx)

See the project [Security Policy](SECURITY.md) for information about reporting vulnerabilities.

## License

[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://github.com/mabel-dev/opteryx/blob/master/LICENSE)
Expand All @@ -326,6 +324,8 @@ Opteryx is in beta. Beta means different things to different people, to us, bein
- Changes are focused on feature completion, bugs, performance, reducing debt, and security
- Code structure and APIs are not stable and may change

We’re actively adding features and improving performance.

## Related Projects

- **[orso](https://github.com/mabel-dev/orso)** DataFrame library
Expand Down
39 changes: 29 additions & 10 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,44 @@
# Security Policy

We take security seriously and appreciate your efforts to make Opteryx more secure.

## Supported Versions

The head of the current and previous minor version are supported for functional and security fixes. New features are only added to the latest version. Patch versions are not supported, fixes generally require the creation on a new patch version.
We support the current and previous minor versions for functional and security fixes. New features are only added to the latest version, while patch versions are created as needed for critical fixes.

| Version | Supported |
| ------- | ------------------ |
| 0.7 | :white_check_mark: |
| 0.8 | :white_check_mark: |
| <= 0.6 | :x: |
| 0.18 | |
| 0.17 | |
| <= 0.16 | |

All features in supported versions have support to resolve security issues regardless, however features which are due for deprecation may be removed rather than fixed.
### Key Notes

Releases may be yanked from PyPI if they contain material bugs, including security flaws.
- Features due for deprecation may be removed rather than fixed.
- Releases containing material bugs or security vulnerabilities may be yanked from PyPI.
- To stay secure, we recommend using the latest version wherever possible.

## Reporting a Vulnerability

Thank you for helping to make Opteryx more secure - Security weaknesses should be reported [via GitHub](https://github.com/mabel-dev/opteryx/security/advisories).
Thank you for helping to keep Opteryx secure! If you’ve discovered a potential vulnerability, please follow these steps:

1. **Submit a Report**: Vulnerabilities should be reported through [GitHub Security Advisories](https://github.com/mabel-dev/opteryx/security/advisories).
1. **Include Details**: To help us assess the issue quickly, please include:
- A description of the vulnerability
- Steps to reproduce it
- Affected versions
- Any known mitigations
1. **Expectations**: We aim to triage and respond within 7 days. If you haven’t heard back, feel free to follow up.

### Disclosure Timeline
- We follow a **90-day coordinated disclosure timeline** from the first contact, regardless of resolution status.
- Credit will be given to researchers unless anonymity is requested.

Please provide a description of the issue, the steps you took to create the issue, affected versions, and if known, mitigations for the issue.
## Scope of Security Issues

We will try to triage and respond to you within a week, if you do not get a response, please continue to get in touch - we appreciate your input but are a small development team who may not monitor for communications continuously.
This policy covers vulnerabilities that may compromise:
- Data confidentiality, integrity, or availability
- System functionality or integrity
- Compliance with security standards

This project follows a 90 day disclosure timeline (from first contact) regardless of response or resolution.
We appreciate your cooperation in helping us maintain a secure and reliable system for the Opteryx community.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 878
__build__ = 879

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
16 changes: 12 additions & 4 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ def read_dataset( # type:ignore
b = time.monotonic_ns()
morsel = DataFrame(schema=result_schema, rows=batch_rows).arrow()
convert_time += time.monotonic_ns() - b
yield morsel
at_least_once = True

# Dynamically adjust chunk size based on the data size, we start by downloading
# 500 records to get an idea of the row size, assuming these 500 are
Expand All @@ -213,6 +211,9 @@ def read_dataset( # type:ignore
self.chunk_size = min(self.chunk_size, 1000000) # cap at 1 million
# DEBUG: log (f"CHANGING CHUNK SIZE TO {self.chunk_size} was {INITIAL_CHUNK_SIZE}.")

yield morsel
at_least_once = True

if not at_least_once:
yield DataFrame(schema=result_schema).arrow()

Expand All @@ -236,9 +237,16 @@ def get_dataset_schema(self) -> RelationSchema:
name=column.name,
type=PYTHON_TO_ORSO_MAP[column.type.python_type],
precision=(
column.type.precision if column.type.precision is not None else 38
column.type.precision
if hasattr(column.type, "precision")
and column.type.precision is not None
else 38
),
scale=(
column.type.scale
if hasattr(column.type, "scale") and column.type.scale is not None
else 14
),
scale=(column.type.scale if column.type.scale is not None else 14),
nullable=column.nullable,
)
for column in table.columns
Expand Down
91 changes: 55 additions & 36 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
active_task_lock = Lock()
active_tasks: int = 0

CONCURRENT_WORKERS = 2


def active_tasks_increment(value: int):
global active_tasks
with active_task_lock:
active_tasks += value
print("AT", active_tasks)


class PhysicalPlan(Graph):
Expand Down Expand Up @@ -139,19 +142,11 @@ def mark_node_exhausted(node_id):
if isinstance(self[node_id], ReaderNode):
return

# Notify downstream nodes
downstream_nodes = self.outgoing_edges(node_id)
if len(downstream_nodes) > 1:
raise InvalidInternalStateError("Cannot FORK execution")
elif len(downstream_nodes) == 1:
_, downstream_node, _ = downstream_nodes[0]
# Check if all parents of downstream_node are exhausted
if all(
node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(downstream_node)
):
work_queue.put((node_id, EOS)) # EOS signals exhaustion
active_tasks_increment(+1)
morsel_accounting[node_id] += 1
for _, _, join_leg in self.ingoing_edges(node_id):
# Queue the task for node with the correct join_leg
work_queue.put((node_id, join_leg, EOS)) # EOS signals exhaustion
active_tasks_increment(+1)
morsel_accounting[node_id] += 1

def update_morsel_accounting(node_id, morsel_count_change: int):
"""
Expand All @@ -166,25 +161,28 @@ def update_morsel_accounting(node_id, morsel_count_change: int):
"""
with morsel_lock:
morsel_accounting[node_id] += morsel_count_change
# print(
# "ACCOUNT",
# node_id,
# morsel_accounting[node_id],
# morsel_count_change,
# self[node_id].name,
# )
print(
"ACCOUNT",
node_id,
morsel_accounting[node_id],
morsel_count_change,
self[node_id].name,
)

if morsel_accounting[node_id] < 0:
raise InvalidInternalStateError("Node input and output count in invalid state.")

# Check if the node is exhausted
if morsel_accounting[node_id] == 0: # No more pending morsels for this node
# Ensure all parent nodes are exhausted
all_parents_exhausted = all(
node_exhaustion[parent] for parent, _, _ in self.ingoing_edges(node_id)
all_providers_exhausted = all(
node_exhaustion[provider] for provider, _, _ in self.ingoing_edges(node_id)
)
if all_parents_exhausted:
if all_providers_exhausted:
print("providers exhausted", node_exhaustion)
mark_node_exhausted(node_id)
else:
print("providers not exhausted", node_exhaustion)

if not self.is_acyclic():
raise InvalidInternalStateError("Query plan is cyclic, cannot execute.")
Expand All @@ -198,6 +196,18 @@ def update_morsel_accounting(node_id, morsel_count_change: int):
if head_node is None:
head_node = self[head_nodes[0]]

# add the left/right labels to the edges coming into the joins
joins = [(nid, node) for nid, node in self.nodes(True) if isinstance(node, JoinNode)]
for nid, join in joins:
for s, t, r in self.breadth_first_search(nid, reverse=True):
source_relations = self[s].parameters.get("all_relations", set())
if set(join._left_relation).intersection(source_relations):
self.remove_edge(s, t, r)
self.add_edge(s, t, "left")
elif set(join._right_relation).intersection(source_relations):
self.remove_edge(s, t, r)
self.add_edge(s, t, "right")

# Special case handling for 'Explain' queries
if isinstance(head_node, ExplainNode):
yield self.explain(head_node.analyze), ResultType.TABULAR
Expand All @@ -222,13 +232,13 @@ def worker_process():
if task is None:
break

node_id, morsel = task
node_id, join_leg, morsel = task
operator = self[node_id]
results = operator(morsel)
results = operator(morsel, join_leg)

for result in results:
# Send results back to the response queue
response_queue.put((node_id, result))
response_queue.put((node_id, join_leg, result))

update_morsel_accounting(node_id, -1)

Expand All @@ -255,31 +265,38 @@ def inner_execute(plan):

# Main engine loop processes pump nodes and coordinates work
for pump_nid, pump_instance in pump_nodes:
for morsel in pump_instance(None):
for morsel in pump_instance(None, None):
# Initial morsels pushed to the work queue determine downstream operators
next_nodes = [target for _, target, _ in self.outgoing_edges(pump_nid)]
for downstream_node in next_nodes:
next_nodes = [
(target, join_leg)
for _, target, join_leg in self.outgoing_edges(pump_nid)
]
for downstream_node, join_leg in next_nodes:
# DEBUG: log (f"following initial {self[pump_nid].name} triggering {self[downstream_node].name}")
# Queue tasks for downstream operators
work_queue.put((downstream_node, morsel))
work_queue.put((downstream_node, join_leg, morsel))
active_tasks_increment(+1)
update_morsel_accounting(downstream_node, +1)

# Pump is exhausted after emitting all morsels
print("pump exhausted", pump_nid)
mark_node_exhausted(pump_nid)

# Process results from the response queue
def should_stop():
all_nodes_exhausted = all(node_exhaustion.values())
queues_empty = work_queue.empty() and response_queue.empty()
all_nodes_inactive = active_tasks <= 0
return all_nodes_exhausted and queues_empty and all_nodes_inactive
return all_nodes_exhausted and all_nodes_inactive

while not should_stop():
# Wait for results from workers
print(active_tasks)
print("*", end="", flush=True)
try:
node_id, result = response_queue.get(timeout=0.1)
node_id, join_leg, result = response_queue.get(timeout=0.1)
print("-", end="")
except Empty:
print(".", end="")
continue

# if a thread threw a error, we get them in the main
Expand All @@ -293,18 +310,20 @@ def should_stop():
continue

# Determine downstream operators
downstream_nodes = [target for _, target, _ in self.outgoing_edges(node_id)]
downstream_nodes = [
(target, join_leg) for _, target, join_leg in self.outgoing_edges(node_id)
]
if len(downstream_nodes) == 0: # Exit node
if result is not None:
yield result # Emit the morsel immediately
active_tasks_increment(-1) # Mark the task as completed
continue

for downstream_node in downstream_nodes:
for downstream_node, join_leg in downstream_nodes:
# Queue tasks for downstream operators
active_tasks_increment(+1)
# DEBUG: log (f"following {self[node_id].name} triggering {self[downstream_node].name}")
work_queue.put((downstream_node, result))
work_queue.put((downstream_node, join_leg, result))
update_morsel_accounting(downstream_node, +1)

# decrement _after_ we've done the work relation to handling the task
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def config(self): # pragma: no cover
def name(self): # pragma: no cover
return "Group"

def execute(self, morsel: pyarrow.Table):
def execute(self, morsel: pyarrow.Table, **kwargs):
if morsel == EOS:
# merge all the morsels together into one table, selecting only the columns
# we're pretty sure we're going to use - this will fail for datasets
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/aggregate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def config(self): # pragma: no cover
def name(self): # pragma: no cover
return "Aggregation"

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
def execute(self, morsel: pyarrow.Table, **kwargs) -> pyarrow.Table:
if morsel == EOS:
if _is_count_star(self.aggregates):
yield _count_star(
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, properties: QueryProperties, **parameters):
def from_dict(cls, dic: dict) -> "AsyncReaderNode": # pragma: no cover
raise NotImplementedError()

def execute(self, morsel) -> Generator:
def execute(self, morsel, **kwargs) -> Generator:
from opteryx import system_statistics

"""Perform this step, time how long is spent doing work"""
Expand Down
4 changes: 2 additions & 2 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ def __str__(self) -> str:
def execute(self, morsel: pyarrow.Table) -> Optional[pyarrow.Table]: # pragma: no cover
pass

def __call__(self, morsel: pyarrow.Table) -> Optional[pyarrow.Table]:
def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Table]:
if hasattr(morsel, "num_rows"):
self.records_in += morsel.num_rows
self.bytes_in += morsel.nbytes
self.calls += 1

# set up the execution of the operator
generator = self.execute(morsel)
generator = self.execute(morsel, join_leg=join_leg)

while True:
try:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def config(self): # pragma: no cover
filters = f"({self._unnest_target.name} IN ({', '.join(self._filters)}))"
return f"CROSS JOIN {filters}"

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
def execute(self, morsel: pyarrow.Table, join_leg: str) -> pyarrow.Table:
if not self.continue_executing:
yield None
return
Expand Down
Loading

0 comments on commit c9cec7b

Please sign in to comment.