Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG contrôle sanitaire de l'eau #380

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions data_processing/insee/deces/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,23 @@
from datagouvfr_data_pipelines.utils.datagouv import (
post_remote_resource,
update_dataset_or_resource_metadata,
check_if_recent_update,
DATAGOUV_URL,
)
from datagouvfr_data_pipelines.utils.mattermost import send_message

DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/"
DATADIR = f"{AIRFLOW_DAG_TMP}deces"
minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN)
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp:
config = json.load(fp)


def check_if_modif():
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp:
config = json.load(fp)
resources = requests.get(
'https://www.data.gouv.fr/api/1/datasets/5de8f397634f4164071119c5/',
headers={"X-fields": "resources{internal{last_modified_internal}}"}
).json()['resources']
lastest_update = requests.get(
(
f'{DATAGOUV_URL}/api/1/datasets/{config["deces_csv"][AIRFLOW_ENV]["dataset_id"]}/'
f'resources/{config["deces_csv"][AIRFLOW_ENV]["resource_id"]}/'
),
headers={"X-fields": "internal{last_modified_internal}"}
).json()["internal"]["last_modified_internal"]
return any(
r["internal"]["last_modified_internal"] > lastest_update for r in resources
return check_if_recent_update(
reference_resource_id=config["deces_csv"][AIRFLOW_ENV]["resource_id"],
dataset_id="5de8f397634f4164071119c5",
on_demo=AIRFLOW_ENV == "dev",
)


Expand Down Expand Up @@ -206,8 +198,6 @@ def send_to_minio():
def publish_on_datagouv(ti):
min_date = ti.xcom_pull(key="min_date", task_ids="gather_data")
max_date = ti.xcom_pull(key="max_date", task_ids="gather_data")
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp:
config = json.load(fp)
for _ext in ["csv", "parquet"]:
post_remote_resource(
dataset_id=config[f"deces_{_ext}"][AIRFLOW_ENV]["dataset_id"],
Expand Down Expand Up @@ -242,9 +232,7 @@ def publish_on_datagouv(ti):


def notification_mattermost():
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp:
data = json.load(fp)
dataset_id = data["deces_csv"][AIRFLOW_ENV]["dataset_id"]
dataset_id = config["deces_csv"][AIRFLOW_ENV]["dataset_id"]
send_message(
f"Données décès agrégées :"
f"\n- uploadées sur Minio"
Expand Down
27 changes: 7 additions & 20 deletions data_processing/rna/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from datagouvfr_data_pipelines.utils.datagouv import (
post_remote_resource,
check_if_recent_update,
DATAGOUV_URL,
)
from datagouvfr_data_pipelines.utils.mattermost import send_message
Expand All @@ -23,25 +24,15 @@
DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/"
DATADIR = f"{AIRFLOW_DAG_TMP}rna"
minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN)
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp:
config = json.load(fp)


def check_if_modif():
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp:
config = json.load(fp)
resources = requests.get(
'https://www.data.gouv.fr/api/1/datasets/58e53811c751df03df38f42d/',
headers={"X-fields": "resources{internal{last_modified_internal}}"}
).json()['resources']
# we consider one arbitrary resource of the target dataset
lastest_update = requests.get(
(
f'{DATAGOUV_URL}/api/1/datasets/{config["import"]["csv"][AIRFLOW_ENV]["dataset_id"]}/'
f'resources/{config["import"]["csv"][AIRFLOW_ENV]["resource_id"]}/'
),
headers={"X-fields": "internal{last_modified_internal}"}
).json()["internal"]["last_modified_internal"]
return any(
r["internal"]["last_modified_internal"] > lastest_update for r in resources
return check_if_recent_update(
reference_resource_id=config["import"]["csv"][AIRFLOW_ENV]["resource_id"],
dataset_id="58e53811c751df03df38f42d",
on_demo=AIRFLOW_ENV == "dev",
)


Expand Down Expand Up @@ -116,8 +107,6 @@ def publish_on_datagouv(ti, file_type):
latest = ti.xcom_pull(key="latest", task_ids=f"process_rna_{file_type}")
y, m, d = latest[:4], latest[4:6], latest[6:]
date = f"{d} {MOIS_FR[m]} {y}"
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp:
config = json.load(fp)
for ext in ["csv", "parquet"]:
post_remote_resource(
dataset_id=config[file_type][ext][AIRFLOW_ENV]["dataset_id"],
Expand All @@ -142,8 +131,6 @@ def publish_on_datagouv(ti, file_type):


def send_notification_mattermost():
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp:
config = json.load(fp)
dataset_id = config["import"]["csv"][AIRFLOW_ENV]["dataset_id"]
send_message(
text=(
Expand Down
92 changes: 92 additions & 0 deletions data_processing/sante/controle_sanitaire_eau/DAG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from datetime import datetime, timedelta
import json
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator

from datagouvfr_data_pipelines.config import (
AIRFLOW_DAG_TMP,
AIRFLOW_DAG_HOME,
)
from datagouvfr_data_pipelines.data_processing.sante.controle_sanitaire_eau.task_functions import (
check_if_modif,
process_data,
send_to_minio,
publish_on_datagouv,
send_notification_mattermost,
)

TMP_FOLDER = f"{AIRFLOW_DAG_TMP}controle_sanitaire_eau/"
DAG_FOLDER = 'datagouvfr_data_pipelines/data_processing/'
DAG_NAME = 'data_processing_controle_sanitaire_eau'
DATADIR = f"{TMP_FOLDER}data"

with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}sante/controle_sanitaire_eau/config/dgv.json") as fp:
config = json.load(fp)

default_args = {
'retries': 5,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id=DAG_NAME,
schedule_interval='0 7 * * *',
start_date=datetime(2024, 8, 10),
catchup=False,
dagrun_timeout=timedelta(minutes=240),
tags=["data_processing", "sante", "eau"],
default_args=default_args,
) as dag:

check_if_modif = ShortCircuitOperator(
task_id='check_if_modif',
python_callable=check_if_modif,
)

clean_previous_outputs = BashOperator(
task_id="clean_previous_outputs",
bash_command=f"rm -rf {TMP_FOLDER} && mkdir -p {TMP_FOLDER}",
)

process_data = PythonOperator(
task_id='process_data',
python_callable=process_data,
)

type_tasks = {}
for file_type in config.keys():
type_tasks[file_type] = [
PythonOperator(
task_id=f'send_to_minio_{file_type}',
python_callable=send_to_minio,
op_kwargs={
"file_type": file_type,
},
),
PythonOperator(
task_id=f'publish_on_datagouv_{file_type}',
python_callable=publish_on_datagouv,
op_kwargs={
"file_type": file_type,
},
),
]

clean_up = BashOperator(
task_id="clean_up",
bash_command=f"rm -rf {TMP_FOLDER}",
)

send_notification_mattermost = PythonOperator(
task_id='send_notification_mattermost',
python_callable=send_notification_mattermost,
)

clean_previous_outputs.set_upstream(check_if_modif)
process_data.set_upstream(clean_previous_outputs)
for file_type in config.keys():
type_tasks[file_type][0].set_upstream(process_data)
type_tasks[file_type][1].set_upstream(type_tasks[file_type][0])
clean_up.set_upstream(type_tasks[file_type][1])
send_notification_mattermost.set_upstream(clean_up)
12 changes: 12 additions & 0 deletions data_processing/sante/controle_sanitaire_eau/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Documentation

## data_processing_rna

| Information | Valeur |
| -------- | -------- |
| Fichier source | `DAG.py` |
| Description | Ce traitement permet de créer deux fichiers agrégés pour les données Import et Waldec, en csv et parquet. |
| Fréquence de mise à jour | Mensuelle |
| Données sources | [JDD RNA](https://www.data.gouv.fr/fr/datasets/repertoire-national-des-associations/)|
| Données de sorties | [Jeu de données](https://www.data.gouv.fr/fr/datasets/rna-agrege-a-lechelle-nationale/) |
| Channel Mattermost d'information | ~startup-datagouv-dataeng |
77 changes: 77 additions & 0 deletions data_processing/sante/controle_sanitaire_eau/config/dgv.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"RESULT": {
"csv.gz" : {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "b1b82209-93c4-4e51-af26-a9a404ba1fe2"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "fee50ac3-df62-4b38-aaf5-4831c6ba65c7"
}
},
"parquet" : {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "97cf8671-ec7f-4e33-a108-d22c33107970"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "50f3cd73-4ce5-4c04-a45c-f883aae389be"
}
},
"dtype": {
"cdparametre": "INT32"
}
},
"COM_UDI": {
"csv" : {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "97c7e4ee-d5be-4709-b255-c4506aa42bff"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "457da169-3c92-47e1-8822-e727665f3ac2"
}
},
"parquet": {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "fffd9023-107c-4ed3-a187-e2038a9c35ff"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "211c23ea-60eb-4b3f-9000-ae319c16cfc9"
}
},
"dtype": {
"debutalim": "DATE"
}
},
"PLV": {
"csv" : {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "2a0605b7-72dd-4529-a0d4-73a471a5cc87"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "1f284e4f-bcc1-47aa-a00e-38da45b0c3b2"
}
},
"parquet": {
"dev": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "cacb4bf4-7bff-4c57-8c62-850bdc37be83"
},
"prod": {
"dataset_id": "5cf8d9ed8b4c4110294c841d",
"resource_id": "dd0f0c3a-49dc-4627-8d13-65b463041500"
}
},
"dtype": {
"dateprel": "DATE"
}
}
}
Loading