Skip to content

Commit

Permalink
Add task to write to bq
Browse files Browse the repository at this point in the history
Add alias
  • Loading branch information
amishas157 committed Dec 11, 2024
1 parent 3e56ccb commit 1102681
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@
"create_sandbox": 2400,
"current_state": 720,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
Expand Down
55 changes: 55 additions & 0 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
It is scheduled to export information to BigQuery at regular intervals.
"""

import os
from ast import literal_eval
from datetime import datetime, timedelta
from json import loads
Expand All @@ -13,6 +14,13 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_del_ins_operator import (
create_del_ins_task,
initialize_task_vars,
)
from stellar_etl_airflow.default import (
alert_after_max_retries,
get_default_dag_args,
Expand All @@ -28,6 +36,9 @@
start_date=datetime(2024, 12, 5, 14, 30),
description="This DAG exports data from external sources such as retool.",
schedule_interval="*/10 * * * *",
params={
"alias": "external",
},
render_template_as_native_obj=True,
user_defined_filters={
"fromjson": lambda s: loads(s),
Expand Down Expand Up @@ -86,6 +97,12 @@ def stellar_etl_internal_task(
)


run_id = "{{ run_id }}"
filepath = os.path.join(
Variable.get("gcs_exported_object_prefix"), run_id, "retool-exported-entity.txt"
)


retool_export_task = stellar_etl_internal_task(
dag,
"export_retool_data",
Expand All @@ -99,5 +116,43 @@ def stellar_etl_internal_task(
Variable.get("gcs_exported_data_bucket_name"),
"--cloud-provider",
"gcp",
"--output",
filepath,
],
)

table_name = "retool_entity_data"
table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data"
public_project = "test-hubble-319619"
public_dataset = "test_crypto_stellar_internal"
batch_id = macros.get_batch_id()
batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"
export_task_id = "export_retool_data"
source_object_suffix = "/*-retool-exported-entity.txt"
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]

task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": public_project,
"dataset": public_dataset,
"table_name": table_name,
"export_task_id": "export_retool_data",
"source_object_suffix": source_object_suffix,
"partition": False,
"cluster": False,
"batch_id": batch_id,
"batch_date": batch_date,
"source_objects": source_objects,
"table_id": table_id,
}

retool_insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)

retool_export_task >> retool_insert_to_bq_task
163 changes: 163 additions & 0 deletions schemas/retool_entity_data_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
[
{
"description": "",
"fields": [],
"mode": "",
"name": "batch_id",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "batch_run_date",
"type": "DATETIME"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "batch_insert_ts",
"type": "TIMESTAMP"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "created_at",
"type": "TIMESTAMP"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "updated_at",
"type": "TIMESTAMP"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "id",
"type": "INTEGER"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "account_sponsor",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "REPEATED",
"name": "app_geographies",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "custodial",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "description",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "fee_sponsor",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "home_domain",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "home_domains_id",
"type": "INTEGER"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "live",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "name",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "non_custodial",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "notes",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "REPEATED",
"name": "ramps",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "sdp_enabled",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "soroban_enabled",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "status",
"type": "STRING"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "verified",
"type": "BOOLEAN"
},
{
"description": "",
"fields": [],
"mode": "",
"name": "website_url",
"type": "STRING"
}
]

0 comments on commit 1102681

Please sign in to comment.