diff --git a/.github/workflows/webapp-deploy.yaml b/.github/workflows/webapp-deploy.yaml new file mode 100644 index 00000000..6ac5d83d --- /dev/null +++ b/.github/workflows/webapp-deploy.yaml @@ -0,0 +1,108 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Docs for the Azure Web Apps Deploy action: https://github.com/Azure/webapps-deploy +# More GitHub Actions for Azure: https://github.com/Azure/actions +# More info on Python, GitHub Actions, and Azure App Service: https://aka.ms/python-webapps-actions + +name: Build and deploy Web App - lst-bench + +on: + push: + paths: + - metrics/** + - run/** + branches: + - main + workflow_dispatch: + +permissions: + contents: read + +env: + AZURE_WEBAPP_NAME: lst-bench + WORKING_DIRECTORY: './metrics/app' + STARTUP_COMMAND: 'python -m streamlit run main.py --server.port 8000 --server.address 0.0.0.0 --client.toolbarMode minimal' + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: 'Set up Python version' + uses: actions/setup-python@v1 + with: + python-version: '3.11' + + - name: 'Create and start virtual environment' + working-directory: ${{ env.WORKING_DIRECTORY }} + run: | + python -m venv venv + source venv/bin/activate + + - name: 'Install dependencies' + working-directory: ${{ env.WORKING_DIRECTORY }} + run: | + pip install setuptools + pip install -r requirements.txt + + - name: 'Copy .duckdb files from ./run/' + run: | + find ./run -type f -name "*.duckdb" -exec cp {} ${{ env.WORKING_DIRECTORY }} \; + + - name: Zip artifact for deployment + working-directory: ${{ env.WORKING_DIRECTORY }} + run: zip release.zip ./* -r + + - name: Upload artifact for deployment jobs + uses: actions/upload-artifact@v3 + with: + name: python-app + path: | + ${{ env.WORKING_DIRECTORY }}/release.zip + + deploy: + runs-on: ubuntu-latest + needs: build + environment: + name: 'webapp-deploy' + url: ${{ steps.deploy-to-webapp.outputs.webapp-url }} + permissions: + id-token: write #This is required for requesting the JWT + + steps: + - name: Download artifact from build job + uses: actions/download-artifact@v3 + with: + name: python-app + path: . + + - name: Unzip artifact for deployment + run: unzip release.zip + + - name: Login to Azure + uses: azure/login@v1 + with: + client-id: ${{ secrets.AZURE_CLIENT_ID }} + tenant-id: ${{ secrets.AZURE_TENANT_ID }} + subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} + + - name: 'Deploy to Azure Web App' + uses: azure/webapps-deploy@v3 + id: deploy-to-webapp + with: + app-name: ${{ env.AZURE_WEBAPP_NAME }} + startup-command: ${{ env.STARTUP_COMMAND }} diff --git a/metrics/app/README.md b/metrics/app/README.md new file mode 100644 index 00000000..e61e059c --- /dev/null +++ b/metrics/app/README.md @@ -0,0 +1,72 @@ + + +# LST-Bench: Dashboard + +**Dashboard:** [https://lst-bench.azurewebsites.net/](https://lst-bench.azurewebsites.net/) + +The LST-Bench dashboard is powered by [Streamlit](https://github.com/streamlit/streamlit) and deployed to Azure App Service through GitHub actions. +You can find the deployment workflow [here](/.github/workflows/webapp-deploy.yaml). +The dashboard provides insights derived from metrics collected from LST-Bench, including execution time and degradation rate. + +## Evaluation +The results displayed in the dashboard are specific to the versions and configurations we tested. +Their performance is subject to change and improvement through further tuning and future developments. +Thus, the primary aim of sharing them is not to assert that one LST or engine is superior (in terms of speed, cost, etc.) to another. +Instead, it is to showcase LST-Bench's capability in quantifying significant trade-offs across various combinations of engines and LSTs. +Further details about the runs and setups are available [here](/run). + +## Adding a New Result +To include data from a new system, duplicate one of the directories in the [run folder](/run) and modify the necessary files within. +For a deeper understanding of the directory structure, consult the [README file](/run/README.md). +The LST-Bench dashboard web app automatically retrieves results from the .duckdb files within those folders and displays them on the dashboard. + +## Dashboard Development +To run the LST-Bench dashboard locally and test your changes, follow these steps: + +### 1. Set up Python version +Ensure you have Python version 3.11 installed on your system. If not, you can download and install it from the official Python website. + +### 2. Create and Start Virtual Environment +To isolate the dependencies of the LST-Bench dashboard, it's recommended to use a virtual environment. You can create one by running the following command in your terminal: + +```bash +python -m venv venv +``` + +Once the virtual environment is created, activate it by executing: + +```bash +source venv/bin/activate +``` + +### 3. Install Dependencies +Install the the necessary packages specified in the requirements.txt using pip: + +```bash +pip install -r requirements.txt +``` + +### 4. Execute Streamlit App +With the dependencies installed, you can now start the Streamlit app by running the following command: + +```bash +python -m streamlit run main.py +``` + +This command will launch the LST-Bench dashboard locally in your browser. diff --git a/metrics/app/main.py b/metrics/app/main.py new file mode 100644 index 00000000..aba25d25 --- /dev/null +++ b/metrics/app/main.py @@ -0,0 +1,370 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import altair as alt +import collections +import duckdb +import logging +import pandas as pd +import os +import streamlit as st +import utils + + +@st.cache_resource +def get_connection(): + connection = duckdb.connect() + # Get databases and attach them + databases_list = [] + + # Function to recursively find DuckDB files in a directory + def find_duckdb_files(directory: str) -> collections.abc.Iterator[str]: + for root, dirs, files in os.walk(directory): + for file in files: + if file.endswith('.duckdb'): + yield os.path.join(root, file) + + # Combine the results of the current directory (used when deployed) + # and find_duckdb_files('../../run/') (used when developing) + for database_path in list(find_duckdb_files('./')) + list(find_duckdb_files('../../run/')): + database = os.path.basename(database_path)[:-3] + connection.execute(f"ATTACH DATABASE '{database_path}' AS \"{database}\" (READ_ONLY)") + databases_list.append(database) + # Create view encompassing all experiments + union_sql = " UNION ".join([f"SELECT * FROM \"{database}\".experiment_telemetry" for database in databases_list]) + connection.execute(f"CREATE VIEW combined_experiment_telemetry AS {union_sql}") + return connection + + +@st.cache_data +def get_systems(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT concat_ws('-', json(event_data)->>'system', json(event_data)->>'system_version') AS system + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY system ASC; + """ + ).df() + return df['system'] + + +@st.cache_data +def get_table_formats(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT concat_ws('-', json(event_data)->>'table_format', json(event_data)->>'table_format_version') AS table_format + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY table_format ASC; + """ + ).df() + return df['table_format'] + + +@st.cache_data +def get_modes(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT json(event_data)->>'mode' AS mode + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY mode ASC; + """ + ).df() + return df['mode'] + + +@st.cache_data +def get_cluster_sizes(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT json(event_data)->>'cluster_size' AS cluster_size + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY cluster_size ASC; + """ + ).df() + return df['cluster_size'] + + +@st.cache_data +def get_machines(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT json(event_data)->>'machine' AS machine + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY machine ASC; + """ + ).df() + return df['machine'] + + +@st.cache_data +def get_workloads(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT event_id AS workload + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY workload ASC; + """ + ).df() + return df['workload'] + + +@st.cache_data +def get_scale_factors(): + connection = get_connection() + df = connection.execute( + f""" + SELECT DISTINCT json(event_data)->>'scale_factor' AS scale_factor + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND NOT(event_id LIKE 'setup%') + ORDER BY scale_factor ASC; + """ + ).df() + return df['scale_factor'] + + +def get_experiments_selected( + _workload_selected: str, + _systems_selected: list[str], + _table_formats_selected: list[str], + _modes_selected: list[str], + _cluster_sizes_selected: list[str], + _machines_selected: list[str], + _scale_factors_selected: list[str]) -> pd.DataFrame: + connection = get_connection() + df = connection.execute( + f""" + SELECT run_id, event_start_time, event_end_time, event_id, + concat_ws('-', json(event_data)->>'system', json(event_data)->>'system_version') AS system, + concat_ws('-', json(event_data)->>'table_format', json(event_data)->>'table_format_version') AS table_format, + cast(json(event_data)->>'mode' AS VARCHAR) AS mode, + cast(json(event_data)->>'cluster_size' AS VARCHAR) AS cluster_size, + cast(json(event_data)->>'machine' AS VARCHAR) AS machine, + cast(json(event_data)->>'scale_factor' AS VARCHAR) AS scale_factor + FROM combined_experiment_telemetry + WHERE event_type = 'EXEC_EXPERIMENT' AND event_status='SUCCESS' AND event_id = '{_workload_selected}' + AND concat_ws('-', json(event_data)->>'system', json(event_data)->>'system_version') IN ({', '.join(["'" + system + "'" for system in _systems_selected])}) + AND concat_ws('-', json(event_data)->>'table_format', json(event_data)->>'table_format_version') IN ({', '.join(["'" + table_format + "'" for table_format in _table_formats_selected])}) + AND cast(json(event_data)->>'mode' AS VARCHAR) IN ({', '.join(["'" + mode + "'" for mode in _modes_selected])}) + AND cast(json(event_data)->>'cluster_size' AS VARCHAR) IN ({', '.join(["'" + cluster_size + "'" for cluster_size in _cluster_sizes_selected])}) + AND cast(json(event_data)->>'machine' AS VARCHAR) IN ({', '.join(["'" + machine + "'" for machine in _machines_selected])}) + AND cast(json(event_data)->>'scale_factor' AS VARCHAR) IN ({', '.join(["'" + scale_factor + "'" for scale_factor in _scale_factors_selected])}) + ORDER BY cast(event_start_time AS TIMESTAMP) ASC; + """ + ).df() + logging.debug(df) + return df + + +@st.cache_data +def get_experiments_data(experiments_df: pd.DataFrame, target_granularity: str) -> pd.DataFrame: + connection = get_connection() + df = experiments_df + granularities = { + 'phase': 'EXEC_PHASE', + 'session': 'EXEC_SESSION', + 'task': 'EXEC_TASK', + 'file': 'EXEC_FILE' + } + for granularity in granularities: + new_experiments_data_df = pd.DataFrame() + for idx, (run_id, event_start_time, event_end_time, event_id, system, table_format, mode, cluster_size, machine, + scale_factor) in enumerate(df.itertuples(index=False)): + new_experiment_data_df = connection.execute( + f""" + SELECT run_id, event_start_time, event_end_time, + concat_ws('/', CASE WHEN event_type = 'EXEC_PHASE' THEN NULL ELSE '{event_id}' END, regexp_replace(event_id, '(_delta|_iceberg|_hudi)', '')) AS event_id + FROM combined_experiment_telemetry + WHERE run_id = ? AND event_type = ? AND event_status='SUCCESS' + AND cast(event_start_time AS TIMESTAMP) >= ? AND cast(event_end_time AS TIMESTAMP) <= ? + ORDER BY cast(event_start_time AS TIMESTAMP) ASC; + """, + [run_id, granularities.get(granularity), event_start_time, event_end_time]).df() + new_experiment_data_df["system"] = system + new_experiment_data_df["table_format"] = table_format + new_experiment_data_df["mode"] = mode + new_experiment_data_df["cluster_size"] = cluster_size + new_experiment_data_df["machine"] = machine + new_experiment_data_df["scale_factor"] = scale_factor + new_experiments_data_df = pd.concat([new_experiments_data_df, new_experiment_data_df]) + df = new_experiments_data_df + if granularity == target_granularity: + break + logging.debug(df) + df['configuration'] = df.apply( + lambda row: (row['system'] + ", " + + row['table_format'] + ", " + + row['mode'] + ", " + + row['cluster_size'] + "x" + row['machine']), + axis=1) + # Calculate latency for each element. + df['time_diff_in_mins'] = df.apply( + lambda row: utils.time_diff_in_minutes(row['event_start_time'], row['event_end_time']), + axis=1) + return df + + +st.set_page_config( + page_title="LST-Bench - Dashboard", + page_icon=":bar_chart:", + layout="wide") +st.title('LST-Bench - Dashboard') +st.write("[Project Page](https://github.com/microsoft/lst-bench/) | " + "[Technical Report](https://arxiv.org/abs/2305.01120) | " + "[Evaluation](https://github.com/microsoft/lst-bench/tree/main/metrics/app#evaluation) | " + "[Adding a New Result](https://github.com/microsoft/lst-bench/tree/main/metrics/app#adding-a-new-result)") + +workloads = get_workloads() +workload_selected = st.sidebar.selectbox('Workload', workloads, index=0) + +systems = get_systems() +systems_selected = st.sidebar.multiselect('System', systems, default=systems) + +table_formats = get_table_formats() +table_formats_selected = st.sidebar.multiselect('Table Format', table_formats, default=table_formats) + +modes = get_modes() +modes_selected = st.sidebar.multiselect('Mode', modes, default=modes) + +cluster_sizes = get_cluster_sizes() +cluster_sizes_selected = st.sidebar.multiselect('Cluster Size', cluster_sizes, default=cluster_sizes) + +machines = get_machines() +machines_selected = st.sidebar.multiselect('Machine', machines, default=machines) + +scale_factors = get_scale_factors() +scale_factors_selected = st.sidebar.multiselect('Scale Factor', scale_factors, default=scale_factors) + +# Bail out if any of the dimensions if empty +if any(len(arr) == 0 for arr in [systems_selected, table_formats_selected, + modes_selected, cluster_sizes_selected, + machines_selected, scale_factors_selected]): + st.error("Please ensure you have selected at least one option for each dimension.") + st.stop() + +# Create tabs for current selection +exec_time_tab = None # This tab shows execution time. +performance_degradation_tab = None # This tab shows degradation rate. +# TODO +io_tab = None # This tab will show I/O metrics, such as bytes read/written. +io_api_calls_tab = None # This tab will show I/O API call metrics. +cpu_utilization_tab = None # This tab will show CPU utilization metrics. + +if workload_selected == 'wp1_longevity': + exec_time_tab, performance_degradation_tab = st.tabs(['Execution Time', 'Performance Degradation']) +else: + exec_time_tab = st.tabs(['Execution Time'])[0] + +if exec_time_tab is not None: + granularity_selected = exec_time_tab.radio( + 'Granularity:', + ['phase', 'session', 'task', 'file'], + horizontal=True) + regex = exec_time_tab.text_input('Filter Results:', placeholder='Regular Expression (Regex)') + + # --- Data manipulations --- # + experiments_selected_df = get_experiments_selected(workload_selected, systems_selected, table_formats_selected, + modes_selected, cluster_sizes_selected, machines_selected, + scale_factors_selected) + experiments_data_df = get_experiments_data(experiments_selected_df, granularity_selected) + experiments_data_df = experiments_data_df[experiments_data_df['event_id'].str.contains(regex, regex=True)] + + if len(experiments_data_df) > 3000: + st.error( + "Too many rows in the result. " + "Please refine your dimension selection or apply a regex filter to narrow down the results.") + st.stop() + + # --- Plot the data --- # + chart = ( + alt.Chart(experiments_data_df) + .mark_bar() + .encode( + alt.X("configuration:N", axis=None, title='Configuration', stack=None), + alt.Y("time_diff_in_mins:Q", title='Latency (mins)', axis=alt.Axis(titleFontWeight='bold')), + alt.Color("configuration:N", legend=alt.Legend(titleFontWeight='bold', labelLimit=400), + title='Configuration'), + alt.Column("event_id:N", title="", + header=alt.Header(orient='bottom', labelFontWeight='bold', labelAlign='right', + labelAngle=-45, labelPadding=20), + sort=alt.SortField("event_start_time", order="ascending")) + ) + .configure_range( + category={'scheme': 'dark2'} + ) + ) + exec_time_tab.markdown('#') + exec_time_tab.altair_chart(chart, theme=None) + +if performance_degradation_tab is not None: + # --- Data manipulations --- # + experiments_selected_df = get_experiments_selected(workload_selected, systems_selected, table_formats_selected, + modes_selected, cluster_sizes_selected, machines_selected, + scale_factors_selected) + experiments_data_df = get_experiments_data(experiments_selected_df, 'phase') + # Filter rows with event_id following the format _ + experiments_data_df = experiments_data_df[experiments_data_df['event_id'].str.match(r'^.+_\d+$')] + # Extract name part from event_id + experiments_data_df['phase_type'] = experiments_data_df['event_id'].str.extract(r'^(.+)_\d+$') + # Group by each distinct 'configuration' and 'phase_type' + grouped_df = experiments_data_df.groupby(['configuration', 'phase_type']) + # Compute performance degradation + grouped_df = grouped_df['time_diff_in_mins'].agg(performance_degradation_rate=utils.performance_degradation) + grouped_df = grouped_df.reset_index() + + # --- Plot the data --- # + # X axis: phase type + # Y axis: configuration + # score: degradation rate + base = ( + alt.Chart(grouped_df) + .encode( + alt.X("phase_type:N", title='', axis=alt.Axis(labelFontWeight='bold', labelAngle=-45)), + alt.Y("configuration:N", title='Configuration', + axis=alt.Axis(titleFontWeight='bold', maxExtent=430, labelLimit=400)) + ) + ) + heatmap = ( + base.mark_rect() + .encode( + alt.Color('performance_degradation_rate:Q', + scale=alt.Scale(scheme='redblue', reverse=True), + title='Performance Degradation Rate', + legend=alt.Legend(titleFontWeight='bold', titleLimit=400, direction="horizontal")) + ) + .properties( + height={"step": 50}, + width={"step": 50} + ) + ) + text = ( + base.mark_text() + .encode( + alt.Text('performance_degradation_rate:Q', format=".2f"), + color=alt.condition(alt.datum.performance_degradation_rate > 0.8, alt.value("black"), alt.value("white")) + ) + ) + performance_degradation_tab.markdown('#') + performance_degradation_tab.altair_chart(heatmap + text, theme=None) diff --git a/metrics/app/requirements.txt b/metrics/app/requirements.txt new file mode 100644 index 00000000..419d2c1a --- /dev/null +++ b/metrics/app/requirements.txt @@ -0,0 +1,4 @@ +altair==5.2.0 +duckdb==0.9.2 +pandas==2.2.0 +streamlit==1.31.0 diff --git a/metrics/app/utils.py b/metrics/app/utils.py new file mode 100644 index 00000000..4595fc8b --- /dev/null +++ b/metrics/app/utils.py @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime as dt + +import pandas as pd + +# -------- DATE MANIPULATIONS -------- # + +utc_format = '%Y-%m-%dT%H:%M:%S.%f%z' + + +def time_diff_in_minutes(time_str1, time_str2): + d1 = dt.datetime.strptime(time_str1, utc_format) + d2 = dt.datetime.strptime(time_str2, utc_format) + return abs((d2 - d1).seconds / 60) + + +# -------- PERFORMANCE DEGRADATION -------- # +def performance_degradation(values: pd.DataFrame) -> float: + """ + Performance degradation is measured as the average rate of change between consecutive values. + + Formula: + degradation_rate = (Σ((M[i] - M[i-1]) / M[i-1])) / (n - 1) + + Where: + - M[i] is the current value + - M[i-1] is the previous value + - n is the number of observations + + Args: + - values (pd.DataFrame): A DataFrame containing the values for which performance degradation is measured. + + Returns: + - float: The average rate of performance degradation. + """ + + # Calculate the difference between each value and its previous value + diffs = values.diff() + # Remove the first row as it will be NaN + diffs = diffs.dropna() + # Divide each difference by the current value + diffs = diffs.div(values.shift(1)) + # Calculate the average rate of change + degradation_rate = diffs.mean() + + # TODO: Consider incorporating variance to understand the variability in performance degradation. + # TODO: Handle multiple runs for more comprehensive analysis. + + return degradation_rate diff --git a/run/spark-3.3.1/results/spark-3.3.1-2024-02-01-8xStandard_E8s_v5.duckdb b/run/spark-3.3.1/results/spark-3.3.1-2024-02-01-8xStandard_E8s_v5.duckdb new file mode 100644 index 00000000..e4b5508f Binary files /dev/null and b/run/spark-3.3.1/results/spark-3.3.1-2024-02-01-8xStandard_E8s_v5.duckdb differ