diff --git a/data_processing/insee/deces/task_functions.py b/data_processing/insee/deces/task_functions.py index ef8dea4a..161ceccd 100644 --- a/data_processing/insee/deces/task_functions.py +++ b/data_processing/insee/deces/task_functions.py @@ -16,6 +16,7 @@ from datagouvfr_data_pipelines.utils.datagouv import ( post_remote_resource, update_dataset_or_resource_metadata, + check_if_recent_update, DATAGOUV_URL, ) from datagouvfr_data_pipelines.utils.mattermost import send_message @@ -23,24 +24,15 @@ DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/" DATADIR = f"{AIRFLOW_DAG_TMP}deces" minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN) +with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp: + config = json.load(fp) def check_if_modif(): - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp: - config = json.load(fp) - resources = requests.get( - 'https://www.data.gouv.fr/api/1/datasets/5de8f397634f4164071119c5/', - headers={"X-fields": "resources{internal{last_modified_internal}}"} - ).json()['resources'] - lastest_update = requests.get( - ( - f'{DATAGOUV_URL}/api/1/datasets/{config["deces_csv"][AIRFLOW_ENV]["dataset_id"]}/' - f'resources/{config["deces_csv"][AIRFLOW_ENV]["resource_id"]}/' - ), - headers={"X-fields": "internal{last_modified_internal}"} - ).json()["internal"]["last_modified_internal"] - return any( - r["internal"]["last_modified_internal"] > lastest_update for r in resources + return check_if_recent_update( + reference_resource_id=config["deces_csv"][AIRFLOW_ENV]["resource_id"], + dataset_id="5de8f397634f4164071119c5", + on_demo=AIRFLOW_ENV == "dev", ) @@ -206,8 +198,6 @@ def send_to_minio(): def publish_on_datagouv(ti): min_date = ti.xcom_pull(key="min_date", task_ids="gather_data") max_date = ti.xcom_pull(key="max_date", task_ids="gather_data") - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp: - config = json.load(fp) for _ext in ["csv", "parquet"]: post_remote_resource( dataset_id=config[f"deces_{_ext}"][AIRFLOW_ENV]["dataset_id"], @@ -242,9 +232,7 @@ def publish_on_datagouv(ti): def notification_mattermost(): - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}insee/deces/config/dgv.json") as fp: - data = json.load(fp) - dataset_id = data["deces_csv"][AIRFLOW_ENV]["dataset_id"] + dataset_id = config["deces_csv"][AIRFLOW_ENV]["dataset_id"] send_message( f"Données décès agrégées :" f"\n- uploadées sur Minio" diff --git a/data_processing/rna/task_functions.py b/data_processing/rna/task_functions.py index 9677bfc2..8e12fdb7 100644 --- a/data_processing/rna/task_functions.py +++ b/data_processing/rna/task_functions.py @@ -14,6 +14,7 @@ ) from datagouvfr_data_pipelines.utils.datagouv import ( post_remote_resource, + check_if_recent_update, DATAGOUV_URL, ) from datagouvfr_data_pipelines.utils.mattermost import send_message @@ -23,25 +24,15 @@ DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/" DATADIR = f"{AIRFLOW_DAG_TMP}rna" minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN) +with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp: + config = json.load(fp) def check_if_modif(): - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp: - config = json.load(fp) - resources = requests.get( - 'https://www.data.gouv.fr/api/1/datasets/58e53811c751df03df38f42d/', - headers={"X-fields": "resources{internal{last_modified_internal}}"} - ).json()['resources'] - # we consider one arbitrary resource of the target dataset - lastest_update = requests.get( - ( - f'{DATAGOUV_URL}/api/1/datasets/{config["import"]["csv"][AIRFLOW_ENV]["dataset_id"]}/' - f'resources/{config["import"]["csv"][AIRFLOW_ENV]["resource_id"]}/' - ), - headers={"X-fields": "internal{last_modified_internal}"} - ).json()["internal"]["last_modified_internal"] - return any( - r["internal"]["last_modified_internal"] > lastest_update for r in resources + return check_if_recent_update( + reference_resource_id=config["import"]["csv"][AIRFLOW_ENV]["resource_id"], + dataset_id="58e53811c751df03df38f42d", + on_demo=AIRFLOW_ENV == "dev", ) @@ -116,8 +107,6 @@ def publish_on_datagouv(ti, file_type): latest = ti.xcom_pull(key="latest", task_ids=f"process_rna_{file_type}") y, m, d = latest[:4], latest[4:6], latest[6:] date = f"{d} {MOIS_FR[m]} {y}" - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp: - config = json.load(fp) for ext in ["csv", "parquet"]: post_remote_resource( dataset_id=config[file_type][ext][AIRFLOW_ENV]["dataset_id"], @@ -142,8 +131,6 @@ def publish_on_datagouv(ti, file_type): def send_notification_mattermost(): - with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}rna/config/dgv.json") as fp: - config = json.load(fp) dataset_id = config["import"]["csv"][AIRFLOW_ENV]["dataset_id"] send_message( text=( diff --git a/data_processing/sante/controle_sanitaire_eau/DAG.py b/data_processing/sante/controle_sanitaire_eau/DAG.py new file mode 100644 index 00000000..5e477cdf --- /dev/null +++ b/data_processing/sante/controle_sanitaire_eau/DAG.py @@ -0,0 +1,92 @@ +from datetime import datetime, timedelta +import json +from airflow.models import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator, ShortCircuitOperator + +from datagouvfr_data_pipelines.config import ( + AIRFLOW_DAG_TMP, + AIRFLOW_DAG_HOME, +) +from datagouvfr_data_pipelines.data_processing.sante.controle_sanitaire_eau.task_functions import ( + check_if_modif, + process_data, + send_to_minio, + publish_on_datagouv, + send_notification_mattermost, +) + +TMP_FOLDER = f"{AIRFLOW_DAG_TMP}controle_sanitaire_eau/" +DAG_FOLDER = 'datagouvfr_data_pipelines/data_processing/' +DAG_NAME = 'data_processing_controle_sanitaire_eau' +DATADIR = f"{TMP_FOLDER}data" + +with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}sante/controle_sanitaire_eau/config/dgv.json") as fp: + config = json.load(fp) + +default_args = { + 'retries': 5, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id=DAG_NAME, + schedule_interval='0 7 * * *', + start_date=datetime(2024, 8, 10), + catchup=False, + dagrun_timeout=timedelta(minutes=240), + tags=["data_processing", "sante", "eau"], + default_args=default_args, +) as dag: + + check_if_modif = ShortCircuitOperator( + task_id='check_if_modif', + python_callable=check_if_modif, + ) + + clean_previous_outputs = BashOperator( + task_id="clean_previous_outputs", + bash_command=f"rm -rf {TMP_FOLDER} && mkdir -p {TMP_FOLDER}", + ) + + process_data = PythonOperator( + task_id='process_data', + python_callable=process_data, + ) + + type_tasks = {} + for file_type in config.keys(): + type_tasks[file_type] = [ + PythonOperator( + task_id=f'send_to_minio_{file_type}', + python_callable=send_to_minio, + op_kwargs={ + "file_type": file_type, + }, + ), + PythonOperator( + task_id=f'publish_on_datagouv_{file_type}', + python_callable=publish_on_datagouv, + op_kwargs={ + "file_type": file_type, + }, + ), + ] + + clean_up = BashOperator( + task_id="clean_up", + bash_command=f"rm -rf {TMP_FOLDER}", + ) + + send_notification_mattermost = PythonOperator( + task_id='send_notification_mattermost', + python_callable=send_notification_mattermost, + ) + + clean_previous_outputs.set_upstream(check_if_modif) + process_data.set_upstream(clean_previous_outputs) + for file_type in config.keys(): + type_tasks[file_type][0].set_upstream(process_data) + type_tasks[file_type][1].set_upstream(type_tasks[file_type][0]) + clean_up.set_upstream(type_tasks[file_type][1]) + send_notification_mattermost.set_upstream(clean_up) diff --git a/data_processing/sante/controle_sanitaire_eau/README.md b/data_processing/sante/controle_sanitaire_eau/README.md new file mode 100644 index 00000000..7f56d873 --- /dev/null +++ b/data_processing/sante/controle_sanitaire_eau/README.md @@ -0,0 +1,12 @@ +# Documentation + +## data_processing_rna + +| Information | Valeur | +| -------- | -------- | +| Fichier source | `DAG.py` | +| Description | Ce traitement permet de créer deux fichiers agrégés pour les données Import et Waldec, en csv et parquet. | +| Fréquence de mise à jour | Mensuelle | +| Données sources | [JDD RNA](https://www.data.gouv.fr/fr/datasets/repertoire-national-des-associations/)| +| Données de sorties | [Jeu de données](https://www.data.gouv.fr/fr/datasets/rna-agrege-a-lechelle-nationale/) | +| Channel Mattermost d'information | ~startup-datagouv-dataeng | diff --git a/data_processing/sante/controle_sanitaire_eau/config/dgv.json b/data_processing/sante/controle_sanitaire_eau/config/dgv.json new file mode 100755 index 00000000..e1b8ee54 --- /dev/null +++ b/data_processing/sante/controle_sanitaire_eau/config/dgv.json @@ -0,0 +1,77 @@ +{ + "RESULT": { + "csv.gz" : { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "b1b82209-93c4-4e51-af26-a9a404ba1fe2" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "fee50ac3-df62-4b38-aaf5-4831c6ba65c7" + } + }, + "parquet" : { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "97cf8671-ec7f-4e33-a108-d22c33107970" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "50f3cd73-4ce5-4c04-a45c-f883aae389be" + } + }, + "dtype": { + "cdparametre": "INT32" + } + }, + "COM_UDI": { + "csv" : { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "97c7e4ee-d5be-4709-b255-c4506aa42bff" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "457da169-3c92-47e1-8822-e727665f3ac2" + } + }, + "parquet": { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "fffd9023-107c-4ed3-a187-e2038a9c35ff" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "211c23ea-60eb-4b3f-9000-ae319c16cfc9" + } + }, + "dtype": { + "debutalim": "DATE" + } + }, + "PLV": { + "csv" : { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "2a0605b7-72dd-4529-a0d4-73a471a5cc87" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "1f284e4f-bcc1-47aa-a00e-38da45b0c3b2" + } + }, + "parquet": { + "dev": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "cacb4bf4-7bff-4c57-8c62-850bdc37be83" + }, + "prod": { + "dataset_id": "5cf8d9ed8b4c4110294c841d", + "resource_id": "dd0f0c3a-49dc-4627-8d13-65b463041500" + } + }, + "dtype": { + "dateprel": "DATE" + } + } +} diff --git a/data_processing/sante/controle_sanitaire_eau/task_functions.py b/data_processing/sante/controle_sanitaire_eau/task_functions.py new file mode 100644 index 00000000..86ba41dc --- /dev/null +++ b/data_processing/sante/controle_sanitaire_eau/task_functions.py @@ -0,0 +1,147 @@ +from datetime import datetime +from io import BytesIO +import json +import os +from zipfile import ZipFile + +import pandas as pd +import re +import requests + +from datagouvfr_data_pipelines.config import ( + AIRFLOW_DAG_HOME, + AIRFLOW_DAG_TMP, + AIRFLOW_ENV, + MINIO_BUCKET_DATA_PIPELINE_OPEN, +) +from datagouvfr_data_pipelines.utils.datagouv import ( + post_remote_communautary_resource, + check_if_recent_update, + DATAGOUV_URL, +) +from datagouvfr_data_pipelines.utils.mattermost import send_message +from datagouvfr_data_pipelines.utils.minio import MinIOClient +from datagouvfr_data_pipelines.utils.utils import ( + csv_to_parquet, + csv_to_csvgz, +) + +DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/" +DATADIR = f"{AIRFLOW_DAG_TMP}controle_sanitaire_eau" +minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN) + +with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}sante/controle_sanitaire_eau/config/dgv.json") as fp: + config = json.load(fp) + + +def check_if_modif(): + return check_if_recent_update( + reference_resource_id=config["RESULT"]["parquet"][AIRFLOW_ENV]["resource_id"], + dataset_id="5cf8d9ed8b4c4110294c841d", + ) + + +def process_data(): + # this is done in one task to get the files only once + resources = requests.get( + 'https://www.data.gouv.fr/api/1/datasets/5cf8d9ed8b4c4110294c841d/', + headers={"X-fields": "resources{title,url}"} + ).json()['resources'] + resources = [r for r in resources if re.search(r"dis-\d{4}.zip", r["title"])] + columns = {file_type: [] for file_type in config.keys()} + for idx, resource in enumerate(resources): + print(resource["title"]) + year = int(resource["title"].split(".")[0].split("-")[1]) + r = requests.get(resource["url"]) + r.raise_for_status() + with ZipFile(BytesIO(r.content)) as zip_ref: + for _, file in enumerate(zip_ref.namelist()): + print(">", file) + file_type = "_".join(file.split("_")[1:-1]) + assert file_type in config.keys() + with zip_ref.open(file) as f: + df = pd.read_csv( + f, + sep=',', + dtype=str, + ) + if not columns[file_type]: + columns[file_type] = list(df.columns) + elif list(df.columns) != columns[file_type]: + print(columns[file_type]) + print(list(df.columns)) + raise ValueError('Columns differ between files') + df["annee"] = year + df.to_csv( + f"{DATADIR}/{file_type}.csv", + index=False, + encoding="utf8", + mode="w" if idx == 0 else "a", + header=idx == 0, + ) + del df + for file_type in config.keys(): + csv_to_parquet( + f"{DATADIR}/{file_type}.csv", + sep=',', + dtype={ + # specific dtypes are listed in the config, default to str + c: config[file_type]["dtype"].get(c, "VARCHAR") + for c in columns[file_type] + }, + ) + if file_type == "RESULT": + # this one is too big for classic csv + csv_to_csvgz(f"{DATADIR}/{file_type}.csv") + + +def send_to_minio(file_type): + minio_open.send_files( + list_files=[ + { + "source_path": f"{DATADIR}/", + "source_name": f"{file_type}.{ext}", + "dest_path": "controle_sanitaire_eau/", + "dest_name": f"{file_type}.{ext}", + } + for ext in ["csv" if file_type != "RESULT" else "csv.gz", "parquet"] + ], + ignore_airflow_env=True, + ) + + +def publish_on_datagouv(file_type): + date = datetime.today().strftime("%d-%m-%Y") + for ext in ["csv" if file_type != "RESULT" else "csv.gz", "parquet"]: + post_remote_communautary_resource( + dataset_id=config[file_type][ext][AIRFLOW_ENV]["dataset_id"], + resource_id=config[file_type][ext][AIRFLOW_ENV]["resource_id"], + payload={ + "url": ( + f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}" + f"/controle_sanitaire_eau/{file_type}.{ext}" + ), + "filesize": os.path.getsize(DATADIR + f"/{file_type}.{ext}"), + "title": ( + f"Données {file_type} (format {ext})" + ), + "format": ext, + "description": ( + f"{file_type} (format {ext})" + " (créé à partir des [fichiers du Ministère des Solidarités et de la santé]" + f"({DATAGOUV_URL}/fr/datasets/{config[file_type][ext][AIRFLOW_ENV]['dataset_id']}/))" + f" (dernière mise à jour le {date})" + ), + }, + ) + + +def send_notification_mattermost(): + dataset_id = config["RESULT"]["parquet"][AIRFLOW_ENV]["dataset_id"] + send_message( + text=( + ":mega: Données du contrôle sanitaire de l'eau mises à jour.\n" + f"- Données stockées sur Minio - Bucket {MINIO_BUCKET_DATA_PIPELINE_OPEN}\n" + f"- Données publiées [sur data.gouv.fr]({DATAGOUV_URL}/fr/datasets/{dataset_id}/#/community-resources)" + ) + ) diff --git a/utils/datagouv.py b/utils/datagouv.py index 049c5a39..4fe824f7 100644 --- a/utils/datagouv.py +++ b/utils/datagouv.py @@ -478,7 +478,7 @@ def post_remote_communautary_resource( print("Payload content:\n", payload) if resource_id: - print(f"Updating resource at {dataset_link} from {payload['remote_url']}") + print(f"Updating resource at {dataset_link} from {payload['url']}") # Update resource refined_url = community_resource_url + f"/{resource_id}" r = datagouv_session.put( @@ -487,7 +487,7 @@ def post_remote_communautary_resource( ) else: - print(f"Creating resource at {dataset_link} from {payload['remote_url']}") + print(f"Creating resource at {dataset_link} from {payload['url']}") # Create resource r = datagouv_session.post( community_resource_url, @@ -566,3 +566,26 @@ def check_duplicated_orga(slug: str) -> tuple[bool, Optional[str]]: if test_orga.status_code not in [404, 410]: return True, url_dup return False, None + + +def check_if_recent_update( + reference_resource_id: str, + dataset_id: str, + on_demo: bool = False, +) -> bool: + """ + Checks whether any resource of the specified dataset has been update more recently + than the specified resource + """ + prefix = "demo" if on_demo else "www" + resources = datagouv_session.get( + f"https://{prefix}.data.gouv.fr/api/1/datasets/{dataset_id}/", + headers={"X-fields": "resources{internal{last_modified_internal}}"} + ).json()['resources'] + lastest_update = datagouv_session.get( + f"https://{prefix}.data.gouv.fr/api/2/datasets/resources/{reference_resource_id}/", + headers={"X-fields": "internal{last_modified_internal}"} + ).json()["internal"]["last_modified_internal"] + return any( + r["internal"]["last_modified_internal"] > lastest_update for r in resources + ) diff --git a/utils/utils.py b/utils/utils.py index 48c910b3..959b8841 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -1,5 +1,9 @@ -import duckdb +import csv from datetime import date, datetime +import gzip +from typing import Optional + +import duckdb MOIS_FR = { "01": "janvier", @@ -44,23 +48,24 @@ def month_year_iter(start_month, start_year, end_month, end_year): def csv_to_parquet( - csv_file_path, - dtype=None, - columns=None, - output_name=None, - output_path=None, - sep=";", - compression="zstd", + csv_file_path: str, + dtype: Optional[dict] = None, + columns: Optional[list] = None, + output_name: Optional[str] = None, + output_path: Optional[str] = None, + sep: str = ";", + compression: str = "zstd", ): """ if dtype is not specified, columns are required to load everything as string (for safety) + for allowed types see https://duckdb.org/docs/sql/data_types/overview.html """ assert dtype is not None or columns is not None if output_name is None: output_name = csv_file_path.split('/')[-1].replace('.csv', '.parquet') if output_path is None: output_path = '/'.join(csv_file_path.split('/')[:-1]) + '/' - print(f"Saving {csv_file_path}") + print(f"Converting {csv_file_path}") print(f"to {output_path + output_name}") db = duckdb.read_csv( csv_file_path, @@ -70,6 +75,29 @@ def csv_to_parquet( db.write_parquet(output_path + output_name, compression=compression) +def csv_to_csvgz( + csv_file_path: str, + output_name: Optional[str] = None, + output_path: Optional[str] = None, + chunk_size: int = 1024 * 1024, +): + if output_name is None: + output_name = csv_file_path.split('/')[-1].replace('.csv', '.csv.gz') + if output_path is None: + output_path = '/'.join(csv_file_path.split('/')[:-1]) + '/' + print(f"Converting {csv_file_path}") + print(f"to {output_path + output_name}") + with ( + open(csv_file_path, 'r', newline='', encoding='utf-8') as csvfile, + gzip.open(output_path + output_name, 'wt', newline='', encoding='utf-8') as gzfile + ): + while True: + chunk = csvfile.read(chunk_size) + if not chunk: + break + gzfile.write(chunk) + + def time_is_between(time1, time2): # no date involved here if time1 > time2: