Help with first python delta table creation on S3 #796
Replies: 2 comments 8 replies
-
Hi @wannabethere, There is a couple of things relevant here :). FIrst a disclaimer, we are working heavily on supporting writes to remote storages from the python bindings, but in general writing is an experimental feature, and remote storage is not yet really supported. Secondly, there is a rust crate build specifically for the purpose of ingesting data in delta from kafka ... https://github.com/delta-io/kafka-delta-ingest That being said, hopefully we can get a bit further... The setting |
Beta Was this translation helpful? Give feedback.
-
Any update on this and the ability to write to minio? Looks like it throws: from deltalake.writer import write_deltalake
storage_options = {
"AWS_ACCESS_KEY_ID": MINO_USER,
"AWS_SECRET_ACCESS_KEY": MINO_KEY,
"AWS_ENDPOINT_URL": MINO_ENDPOINT_URL,
"AWS_REGION": 'us-west-2',
}
df = pd.DataFrame({'x': [1, 2, 3]})
write_deltalake("s3://some_bucket", df, storage_options=storage_options)
|
Beta Was this translation helpful? Give feedback.
-
Hi, I am trying to use delta-rs for the first time to enable consuming Json from Kafka to write delta. Ours is a python shop. We are trying this out but it keeps failing with:
Traceback (most recent call last):
File "testdelta.py", line 72, in
existing_table(tmp_path=table_path,sample_data=sample_data())
File "testdelta.py", line 53, in existing_table
write_deltalake(path, sample_data)
File "/usr/local/lib/python3.8/site-packages/deltalake/writer.py", line 147, in write_deltalake
table = try_get_deltatable(table_or_uri)
File "/usr/local/lib/python3.8/site-packages/deltalake/writer.py", line 291, in try_get_deltatable
return DeltaTable(table_uri)
File "/usr/local/lib/python3.8/site-packages/deltalake/table.py", line 91, in init
self._table = RawDeltaTable(
deltalake.PyDeltaTableError: Failed to load checkpoint: Failed to read checkpoint content: Generic S3 error: Error performing get request testing/_delta_log/_last_checkpoint: response error "request error", after 0 retries: builder error for url (http://s3.us-west-2.amazonaws.com/bx-de-deltalake/testing/_delta_log/_last_checkpoint): URL scheme is not allowed
Here is the code:
`import os
import pathlib
from datetime import date, datetime, timedelta
from decimal import Decimal
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake
from deltalake import DeltaTable
from deltalake.deltalake import DeltaStorageFsBackend
from deltalake.fs import DeltaStorageHandler
import boto3
table_path = "s3a://test-bucket/"
BUCKET = 'test-bucket'
def credentials():
os.environ["AWS_REGION"] = "local"
os.environ["AWS_ACCESS_KEY_ID"] = "XXXXXX"
os.environ["AWS_SECRET_ACCESS_KEY"] = "XXXXXX"
os.environ["AWS_ENDPOINT_URL"] = "http://s3.us-west-2.amazonaws.com" #"http://localhost:9000"
def sample_data():
nrows = 5
return pa.table(
{
"utf8": pa.array([str(x) for x in range(nrows)]),
"int64": pa.array(list(range(nrows)), pa.int64()),
"int32": pa.array(list(range(nrows)), pa.int32()),
"int16": pa.array(list(range(nrows)), pa.int16()),
"int8": pa.array(list(range(nrows)), pa.int8()),
"float32": pa.array([float(x) for x in range(nrows)], pa.float32()),
"float64": pa.array([float(x) for x in range(nrows)], pa.float64()),
"bool": pa.array([x % 2 == 0 for x in range(nrows)]),
"binary": pa.array([str(x).encode() for x in range(nrows)]),
"decimal": pa.array([Decimal("10.000") + x for x in range(nrows)]),
"date32": pa.array(
[date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)]
),
"timestamp": pa.array(
[datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)]
),
"struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]),
"list": pa.array([list(range(x + 1)) for x in range(nrows)]),
# NOTE: apache/arrow-rs#477
#'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())),
}
)
def existing_table(tmp_path: str, sample_data: pa.Table):
path = tmp_path
write_deltalake(path, sample_data)
return DeltaTable(path)
if name == "main":
credentials()
handler = DeltaStorageHandler(table_path)
#dt = DeltaTable(table_path)
existing_table(tmp_path=table_path,sample_data=sample_data())`
Any ideas on how we can get past this error.
Beta Was this translation helpful? Give feedback.
All reactions