Skip to content

Commit

Permalink
Add support for Spark iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ilias1111 authored Aug 30, 2024
1 parent f594dbd commit cb18027
Show file tree
Hide file tree
Showing 20 changed files with 787 additions and 56 deletions.
58 changes: 46 additions & 12 deletions .github/workflows/pr_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ jobs:
# Run tests from integration_tests sub dir
working-directory: ./integration_tests
strategy:
fail-fast: false
matrix:
dbt_version: ["1.*"]
warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift"] # TODO: Add RS self-hosted runner
warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner
services:
postgres:
image: postgres:latest
Expand All @@ -82,7 +83,26 @@ jobs:
steps:
- name: Check out
uses: actions/checkout@v3

- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Set warehouse variables
id: set_warehouse
run: |
WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1)
WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2)
echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV
echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV
echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT
echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT
# Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX.
# SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables
- name: Set SCHEMA_SUFFIX env
Expand All @@ -92,7 +112,7 @@ jobs:

- name: Set DEFAULT_TARGET env
run: |
echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV
echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV
- name: Python setup
uses: actions/setup-python@v4
Expand All @@ -103,32 +123,46 @@ jobs:
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
restore-keys: |
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }}
${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}}
# Install latest patch version. Upgrade if cache contains old patch version.
- name: Install dependencies
run: |
pip install wheel setuptools
pip install -Iv dbt-${{ matrix.warehouse }}==${{ matrix.dbt_version }} --upgrade
pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse != 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM != 'spark'}}

- name: Install spark dependencies
run: |
pip install --upgrade pip wheel setuptools
pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade
pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade
dbt deps
if: ${{matrix.warehouse == 'spark'}}
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
- name: Build and start Spark cluster
working-directory: .github/workflows/spark_deployment
run: |
docker-compose up -d
echo "Waiting for Spark services to start..."
sleep 90
if: ${{env.WAREHOUSE_PLATFORM == 'spark'}}

- name: "Pre-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
- name: Run tests
run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }}
run: ./.scripts/integration_tests.sh -d ${{matrix.warehouse}}

- name: "Post-test: Drop ci schemas"
run: |
dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }}
dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}}
34 changes: 34 additions & 0 deletions .github/workflows/spark_deployment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM openjdk:11-jre-slim

# Set environment variables
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3.3.4
ENV ICEBERG_VERSION=1.4.2
ENV AWS_SDK_VERSION=1.12.581

# Install necessary tools
RUN apt-get update && apt-get install -y curl wget procps rsync ssh

# Download and install Spark
RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \
rm spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Set Spark environment variables
ENV SPARK_HOME=/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

# Download necessary JARs
RUN mkdir -p /spark/jars && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar

# Create directory for Spark events
RUN mkdir -p /tmp/spark-events

WORKDIR /spark

CMD ["bash"]
20 changes: 20 additions & 0 deletions .github/workflows/spark_deployment/build_and_push.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Set variables
DOCKER_HUB_ORG="snowplow"
IMAGE_NAME="spark-s3-iceberg"
TAG="latest"

# Build the image
echo "Building Docker image..."
docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG .

# Log in to Docker Hub
echo "Logging in to Docker Hub..."
docker login

# Push the image to Docker Hub
echo "Pushing image to Docker Hub..."
docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG

echo "Image successfully built and pushed to Docker Hub"
66 changes: 66 additions & 0 deletions .github/workflows/spark_deployment/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3'

networks:
spark-network:
driver: bridge

services:
spark-master:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"]
hostname: spark-master
ports:
- '8080:8080'
- '7077:7077'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_MASTER_HOST=spark-master
- SPARK_MASTER_PORT=7077
- SPARK_MASTER_OPTS="-Dspark.driver.memory=2g"
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

spark-worker:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"]
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=3G
- SPARK_LOCAL_IP=spark-worker
- SPARK_MASTER=spark://spark-master:7077
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network

thrift-server:
image: snowplow/spark-s3-iceberg:latest
command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"]
ports:
- '10000:10000'
depends_on:
- spark-master
- spark-worker
environment:
- SPARK_LOCAL_IP=thrift-server
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_REGION=eu-west-1
- AWS_DEFAULT_REGION=eu-west-1
volumes:
- ./spark-defaults.conf:/spark/conf/spark-defaults.conf
networks:
- spark-network
44 changes: 44 additions & 0 deletions .github/workflows/spark_deployment/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
spark.master spark://spark-master:7077

spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing
spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.defaultCatalog glue
spark.sql.catalog.glue.database dbt-spark-iceberg

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key <AWS_ACCESS_KEY_ID>
spark.hadoop.fs.s3a.secret.key <AWS_SECRET_ACCESS_KEY>
spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.region eu-west-1
spark.hadoop.fs.s3a.aws.region eu-west-1

# Enabling AWS SDK V4 signing (required for regions launched after January 2014)
spark.hadoop.com.amazonaws.services.s3.enableV4 true
spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

# Hive Metastore Configuration (using AWS Glue)
spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

# Thrift Server Configuration for better performance in concurrent environments
spark.sql.hive.thriftServer.singleSession false
spark.sql.hive.thriftServer.async true
# spark.sql.hive.thriftServer.maxWorkerThreads 100
# spark.sql.hive.thriftServer.minWorkerThreads 50
# spark.sql.hive.thriftServer.workerQueue.size 2000

# Memory and Performance Tuning
# spark.driver.memory 2g
# spark.executor.memory 3g
# spark.worker.memory 4g
spark.network.timeout 600s
spark.sql.broadcastTimeout 600s
spark.sql.adaptive.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer

# Logging and Debugging
spark.eventLog.enabled true
spark.eventLog.dir /tmp/spark-events
2 changes: 1 addition & 1 deletion integration_tests/.scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ do
esac
done

declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake")
declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake", "spark_iceberg")

# set to lower case
DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')"
Expand Down
21 changes: 11 additions & 10 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ integration_tests:
token_uri: "{{ env_var('BIGQUERY_SERVICE_TOKEN_URI') }}"
auth_provider_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_AUTH_PROVIDER_X509_CERT_URL') }}"
client_x509_cert_url: "{{ env_var('BIGQUERY_SERVICE_CLIENT_X509_CERT_URL') }}"

snowflake:
type: snowflake
account: "{{ env_var('SNOWFLAKE_TEST_ACCOUNT') }}"
Expand All @@ -58,20 +57,22 @@ integration_tests:
warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}"
schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
threads: 4

databricks:
type: databricks
schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
host: "{{ env_var('DATABRICKS_TEST_HOST') }}"
http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}"
token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}"
threads: 4

spark:
spark_iceberg:
type: spark
schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
host: "{{ env_var('DATABRICKS_TEST_HOST') }}"
http_path: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}"
token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}"
endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}"
threads: 4
method: thrift
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
connect_retries: 5
connect_timeout: 60
threads: 1
vars:
snowplow__datalake_file_format: iceberg
Loading

0 comments on commit cb18027

Please sign in to comment.