Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2185 #2217

Merged
merged 2 commits into from
Jan 8, 2025
Merged

#2185 #2217

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 967
__build__ = 968

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
30 changes: 15 additions & 15 deletions opteryx/connectors/iceberg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@
from orso.schema import FlatColumn
from orso.schema import RelationSchema

from opteryx.connectors.base.base_connector import DEFAULT_MORSEL_SIZE
from opteryx.connectors import DiskConnector
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.shared import MaterializedDatasets
from opteryx.utils import arrow


class IcebergConnector(BaseConnector):
__mode__ = "Internal"
__mode__ = "Blob"
__type__ = "ARROW"

def __init__(self, *args, **kwargs):
def __init__(self, *args, catalog=None, io=DiskConnector, **kwargs):
BaseConnector.__init__(self, **kwargs)

self.dataset = self.dataset.lower()
self._datasets = MaterializedDatasets()
self.table = catalog.load_table(self.dataset)
self.io_connector = io(**kwargs)

def get_dataset_schema(self) -> RelationSchema:
dataset = self._datasets[self.dataset]
arrow_schema = dataset.schema
arrow_schema = self.table.schema().as_arrow()

self.schema = RelationSchema(
name=self.dataset,
Expand All @@ -41,12 +39,14 @@ def get_dataset_schema(self) -> RelationSchema:
return self.schema

def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
dataset = self._datasets[self.dataset]
if columns is None:
column_names = self.schema.column_names
else:
column_names = [col.source_column for col in columns]

batch_size = DEFAULT_MORSEL_SIZE // (dataset.nbytes / dataset.num_rows)
reader = self.table.scan(
selected_fields=column_names,
).to_arrow_batch_reader()

for batch in dataset.to_batches(max_chunksize=batch_size):
morsel = pyarrow.Table.from_batches([batch], schema=dataset.schema)
if columns:
morsel = arrow.post_read_projector(morsel, columns)
yield morsel
for batch in reader:
yield pyarrow.Table.from_batches([batch])
30 changes: 28 additions & 2 deletions tests/catalog/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@

BASE_PATH: str = "tmp/iceberg"

# this is how we get the raw list of files for the scan
# print([task.file.file_path for task in self.table.scan().plan_files()])



def set_up_iceberg():
"""
Set up a local Iceberg catalog for testing with NVD data.
Expand Down Expand Up @@ -70,10 +75,31 @@ def test_iceberg_basic():
},
)

opteryx.register_store("iceberg", IcebergConnector, io=DiskConnector)
opteryx.register_store("iceberg", IcebergConnector, catalog=catalog, io=DiskConnector)

table = catalog.load_table("iceberg.tweets")
table.scan().to_arrow()


@skip_if(is_arm() or is_windows() or is_mac())
def test_iceberg_get_schema():

from pyiceberg.catalog import load_catalog

set_up_iceberg()

catalog = load_catalog(
"default",
**{
"uri": f"sqlite:///{BASE_PATH}/pyiceberg_catalog.db",
"warehouse": f"file://{BASE_PATH}",
},
)

opteryx.register_store("iceberg", IcebergConnector, catalog=catalog, io=DiskConnector)

table = catalog.load_table("iceberg.tweets")
print(table.scan().to_arrow())
table.schema().as_arrow()


if __name__ == "__main__": # pragma: no cover
Expand Down
Loading