-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for data pipeline load testing #675
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \ | ||
pip install "platform-plugin-aspects==0.4.0" | ||
pip install "platform-plugin-aspects==v0.5.0" | ||
RUN --mount=type=cache,target=/openedx/.cache/pip,sharing=shared \ | ||
pip install "edx-event-routing-backends==v8.1.1" | ||
pip install "edx-event-routing-backends==v8.3.1" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,5 @@ | ||
""" | ||
Partition the event_sink.user_profile table | ||
|
||
.. pii: Stores Open edX user profile data. | ||
.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender | ||
.. pii_retirement: local_api, consumer_api | ||
Partition the xapi table by year and month | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything in this file is just cleaning up incorrect language from the original commit. |
||
""" | ||
from alembic import op | ||
|
||
|
@@ -15,15 +11,15 @@ | |
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" | ||
engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree" | ||
|
||
old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}" | ||
old_xapi_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}" | ||
|
||
def upgrade(): | ||
# Partition event_sink.user_profile table | ||
# 1. Rename old table | ||
op.execute( | ||
f""" | ||
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} | ||
TO {old_user_profile_table} | ||
TO {old_xapi_table} | ||
{on_cluster} | ||
""" | ||
) | ||
|
@@ -46,13 +42,13 @@ def upgrade(): | |
op.execute( | ||
f""" | ||
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} | ||
SELECT * FROM {old_user_profile_table} | ||
SELECT event_id, emission_time, event FROM {old_xapi_table} | ||
""" | ||
) | ||
# 4. Drop the old table | ||
op.execute( | ||
f""" | ||
DROP TABLE {old_user_profile_table} | ||
DROP TABLE {old_xapi_table} | ||
{on_cluster} | ||
""" | ||
) | ||
|
@@ -64,7 +60,7 @@ def downgrade(): | |
op.execute( | ||
f""" | ||
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} | ||
TO {old_user_profile_table} | ||
TO {old_xapi_table} | ||
{on_cluster} | ||
""" | ||
) | ||
|
@@ -87,14 +83,14 @@ def downgrade(): | |
op.execute( | ||
f""" | ||
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }} | ||
SELECT * FROM {old_user_profile_table} | ||
SELECT * FROM {old_xapi_table} | ||
""" | ||
|
||
) | ||
# 4. Drop the old table | ||
op.execute( | ||
f""" | ||
DROP TABLE {old_user_profile_table} | ||
DROP TABLE {old_xapi_table} | ||
{on_cluster} | ||
""" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
""" | ||
Create the load_test_stats table | ||
|
||
This table is always created, but it will only be populated if the load test | ||
management commands are run from the platform_plugin_aspects app. | ||
""" | ||
from alembic import op | ||
|
||
|
||
revision = "0034" | ||
down_revision = "0033" | ||
branch_labels = None | ||
depends_on = None | ||
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else "" | ||
engine = "ReplicatedMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "MergeTree" | ||
|
||
|
||
def upgrade(): | ||
op.execute( | ||
f""" | ||
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_runs | ||
{on_cluster} | ||
( | ||
run_id String, | ||
timestamp DateTime default now(), | ||
event_type String, | ||
extra String | ||
) | ||
engine = {engine} PRIMARY KEY (run_id, timestamp) | ||
ORDER BY (run_id, timestamp) | ||
SETTINGS index_granularity = 8192; | ||
""" | ||
) | ||
|
||
op.execute( | ||
f""" | ||
CREATE TABLE IF NOT EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats | ||
{on_cluster} | ||
( | ||
run_id String, | ||
timestamp DateTime default now(), | ||
stats String | ||
) | ||
engine = {engine} PRIMARY KEY (run_id, timestamp) | ||
ORDER BY (run_id, timestamp) | ||
SETTINGS index_granularity = 8192; | ||
""" | ||
) | ||
|
||
|
||
def downgrade(): | ||
op.execute( | ||
"DROP TABLE IF EXISTS {{ ASPECTS_EVENT_SINK_DATABASE }}.load_test_stats" | ||
f"{on_cluster}" | ||
) | ||
|
||
op.execute( | ||
"DROP TABLE IF EXISTS {{ASPECTS_EVENT_SINK_DATABASE}}.load_test_runs" | ||
f"{on_cluster}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,4 +3,4 @@ data_dir = "/vector-data-dir" | |
# Vector's API for introspection | ||
[api] | ||
enabled = true | ||
address = "127.0.0.1:8686" | ||
address = "0.0.0.0:8686" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed to support the load test scripts hitting the Vector API, but it should still not be available outside of the cluster unless someone configures it that way on purpose. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These settings are new and let us turn on and off each piece of the Vector storage individually.