Skip to content

Commit

Permalink
feat(paysage): save
Browse files Browse the repository at this point in the history
  • Loading branch information
ahonestla committed Apr 18, 2024
1 parent f3310c7 commit 71286d7
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 46 deletions.
164 changes: 124 additions & 40 deletions project/server/main/load_paysage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
import datetime
import pandas as pd
import numpy as np
import requests
import json
from elasticsearch.client import IndicesClient

ODS_KEY = os.getenv("ODS_KEY")
ODS_PAYSAGE = "fr_esr_paysage_structures_all"

from project.server.main.elastic_utils import (
get_analyzers,
get_tokenizers,
Expand All @@ -30,7 +28,28 @@
logger = get_logger(__name__)

SOURCE = "paysage"
ODS_KEY = os.getenv("ODS_KEY")
ODS_PAYSAGE = "structures-de-paysage-v2"
ES_URL = os.getenv("ES_PAYSAGE_URL")
ES_TOKEN = os.getenv("ES_PAYSAGE_TOKEN")

WANTED_CATEGORIES = [
"Université",
"Établissement public expérimental",
"Établissement supérieur d'architecture",
"Organisme de recherche",
"Société d'accélération du transfert de technologies",
"Établissement d'enseignement supérieur privé d'intérêt général",
"Tutelle des établissements",
"Incubateur public",
"Liste des établissements publics relevant du ministre chargé de l'Enseignement supérieur",
"Etablissements d’enseignement supérieur techniques privés (hors formations relevant du commerce et de la gestion)",
"Etablissement publics d’enseignement supérieur entrant dans la cotutelle du ministre chargé de l’enseignement supérieur (Art L 123-1 du code de l’éducation)",
"Commerce et gestion - Etablissements d’enseignement supérieur techniques privés et consulaires autorisés à délivrer un diplôme visé par le ministre chargé de l’enseignement supérieur et/ou à conférer le grade universitaire",
"Opérateur du programme 150 - Formations supérieures et recherche universitaire",
"Structure de recherche",
# "Établissement d'enseignement supérieur étranger"
]

def load_paysage(index_prefix: str = "matcher") -> dict:
logger.debug("Start loading Paysage data...")
Expand All @@ -48,9 +67,7 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
"id",
"city",
"zone_emploi",
"country_code",
"acronym",
"year",
"name",
"year",
"wikidata",
Expand All @@ -62,7 +79,6 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
"id": "light",
"city": "city_analyzer",
"zone_emploi": "city_analyzer",
"country_code": "light",
"acronym": "acronym_analyzer",
"name": "heavy_fr",
"name_txt": "heavy_fr",
Expand Down Expand Up @@ -151,14 +167,71 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
return results


def download_data() -> list:
def download_dataframe() -> pd.DataFrame:
logger.debug(f"Download Paysage data from {ODS_PAYSAGE}")
data = pd.read_csv(
f"https://data.enseignementsup-recherche.gouv.fr/explore/dataset/{ODS_PAYSAGE}/download/?format=csv&apikey={ODS_KEY}",
sep=";",
low_memory=False,
)
records = data.replace(np.nan, None).to_dict(orient="records")
return data.replace(np.nan, None)


def download_categories() -> dict:
logger.debug(f"Download Paysage categories from {ES_URL}")
keep_alive = 1
scroll_id = None
categories = {}
hits = []
size = 10000
count = 0
total = 0
headers = {"Authorization": ES_TOKEN}
url = f"{ES_URL}/paysage/_search?scroll={keep_alive}m"
query = {
"size": size,
"_source": ["id", "category"],
"query": {"match": {"type": "structures"}},
}

# Scroll to get all results
while total == 0 or count < total:
if scroll_id:
url = f"{ES_URL}/_search/scroll"
query = {"scroll": f"{keep_alive}m", "scroll_id": scroll_id}
res = requests.post(url=url, headers=headers, json=query)
if res.status_code == 200:
json = res.json()
scroll_id = json.get("_scroll_id")
total = json.get("hits").get("total").get("value")
data = json.get("hits").get("hits")
count += len(data)
sources = [d.get("_source") for d in data]
hits += sources
else:
logger.error(f"Elastic error {res.status_code}: stop scroll ({count}/{total})")
break

if hits:
categories = {item["id"]: item["category"] for item in hits}
return categories


def download_data() -> list:
# Download data
df = download_dataframe()

# Download categories
categories = download_categories()
df["category"] = df["id"].apply(lambda x: categories.get(x))

# Filter wanted categories
df_filter = df[df["category"].isin(WANTED_CATEGORIES)].copy()
logger.debug(f"Filter {len(df_filter)}/{len(df)} entries with wanted categories")

# Cast as records
records = df_filter.to_dict(orient="records")

return records


Expand All @@ -178,54 +251,65 @@ def transform_data(records: list) -> list:
logger.debug("Get data from Paysage records")
name_acronym_city = {}
for record in records:
current_id = record["identifiant_interne"]
current_id = record["id"]
name_acronym_city[current_id] = {}

# Acronyms
acronyms, names = [], []
sigle = record.get("sigle")
name_short = record.get("nom_court")
if sigle:
acronyms.append(sigle)
if name_short:
if name_short.isalnum():
acronyms.append(name_short)
else:
names.append(name_short)
acronyms_list = ["acronymfr", "acronymen", "acronymlocal"]
acronyms = [record.get(acronym) for acronym in acronyms_list if record.get(acronym)]

# Names
labels = ["uo_lib", "uo_lib_officiel", "uo_lib_en"]
names += [record.get(name) for name in labels if record.get(name)]
names_list = ["usualname", "officialname", "nameen"]
names = [record.get(name) for name in names_list if record.get(name)]

short_name = record.get("shortname")
if short_name:
if short_name.isalnum():
acronyms.append(short_name)
else:
names.append(short_name)

acronyms = list(set(acronyms))
names = list(set(names))
names = list(set(names) - set(acronyms))

# Cities, country_alpha2, and zone_emploi
cities, country_alpha2, zone_emploi = [], [], []
city = record.get("com_nom")
city_code = record.get("com_code")
country = record.get("pays_etranger_acheminement")
# City
localisation = json.loads(record.get("currentlocalisation", "{}"))
city = record.get("com_nom") or localisation.get("city") or localisation.get("locality")
if city:
clean_city = " ".join([s for s in city.split(" ") if s.isalpha()])
city = clean_city if clean_city else city
cities.append(city)
if city_code in city_zone_emploi:
zone_emploi += city_zone_emploi[city_code]

# Zone emploi (+ academie + urban unit)
zone_emploi = []
city_code = record.get("cityid")
if city_code in city_zone_emploi:
zone_emploi += city_zone_emploi[city_code]
academie = record.get("aca_nom")
if academie:
zone_emploi.append(academie)
urban_unit = record.get("uucr_nom")
if urban_unit:
zone_emploi.append(urban_unit)

# Countries
country = record.get("country")
country_alpha3 = localisation.get("iso3")
if country:
alpha2 = get_alpha2_from_french(country)
country_alpha2.append(alpha2)
country_alpha2 = get_alpha2_from_french(country)

name_acronym_city[current_id]["city"] = clean_list(data=cities)
name_acronym_city[current_id]["zone_emploi"] = clean_list(zone_emploi)
name_acronym_city[current_id]["acronym"] = clean_list(data=acronyms, ignored=ACRONYM_IGNORED, min_character=2)
name_acronym_city[current_id]["name"] = clean_list(data=names, stopwords=FRENCH_STOP, min_token=2)
country_alpha2 = clean_list(data=country_alpha2)
if not country_alpha2:
country_alpha2 = ["fr"]
name_acronym_city[current_id]["country_alpha2"] = country_alpha2[0]
name_acronym_city[current_id]["country"] = clean_list([country]) if country else []
name_acronym_city[current_id]["country_alpha2"] = clean_list([country_alpha2]) if country_alpha2 else []
name_acronym_city[current_id]["country_alpha3"] = clean_list([country_alpha3]) if country_alpha2 else []
name_acronym_city[current_id]["city"] = clean_list([city]) if city else []
name_acronym_city[current_id]["zone_emploi"] = clean_list(zone_emploi)

logger.debug("Transform records to elastic indexes")
es_paysages = []
for record in records:
paysage_id = record.get("identifiant_interne")
paysage_id = record.get("id")
es_paysage = {"id": paysage_id}
# Acronyms & names
es_paysage["acronym"] = name_acronym_city[paysage_id]["acronym"]
Expand Down Expand Up @@ -256,7 +340,7 @@ def transform_data(records: list) -> list:
# Url
url = record.get("url")
if isinstance(url, list):
raise Exception("found list url", url)
raise Exception("Found list url", url)
if url:
es_paysage["web_url"] = clean_url(url)
es_paysage["web_domain"] = get_url_domain(url)
Expand Down
4 changes: 2 additions & 2 deletions project/server/main/my_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def update_index_alias(self, my_alias, new_index):
logger.debug(f'update_index_alias {my_alias} {new_index}')
old_index = None
aliases_data = self.indices.get_alias('*')
logger.debug('aliases_data')
logger.debug(aliases_data)
# logger.debug('aliases_data')
# logger.debug(aliases_data)
for idx in aliases_data:
if my_alias in list(aliases_data[idx]['aliases'].keys()):
old_index = idx
Expand Down
70 changes: 66 additions & 4 deletions scripts/paysage_match_affiliation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

ODS_API_KEY = os.getenv("ODS_API_KEY")
ODS_PAYSAGE_KEY = os.getenv("ODS_PAYSAGE_KEY")
ES_URL = os.getenv("ES_PAYSAGE_URL")
ES_TOKEN = os.getenv("ES_PAYSAGE_TOKEN")

AFFILIATION_MATCHER_API = f"{os.getenv('AFFILIATION_MATCHER_URL')}/match"
AFFILIATION_MATCHER_LIST_API = "http://localhost:5004/match_list"
Expand Down Expand Up @@ -44,6 +46,23 @@
COL_AFFILIATION_IS_MATCH = "affiliation_is_match"
COL_AFFILIATION_IS_MATCH_OFF = "affiliation_is_match_off"

WANTED_CATEGORIES = [
"Université",
"Établissement public expérimental",
"Établissement supérieur d'architecture",
"Organisme de recherche",
"Société d'accélération du transfert de technologies",
"Établissement d'enseignement supérieur privé d'intérêt général",
"Tutelle des établissements",
"Incubateur public",
"Liste des établissements publics relevant du ministre chargé de l'Enseignement supérieur",
"Etablissements d’enseignement supérieur techniques privés (hors formations relevant du commerce et de la gestion)",
"Etablissement publics d’enseignement supérieur entrant dans la cotutelle du ministre chargé de l’enseignement supérieur (Art L 123-1 du code de l’éducation)",
"Commerce et gestion - Etablissements d’enseignement supérieur techniques privés et consulaires autorisés à délivrer un diplôme visé par le ministre chargé de l’enseignement supérieur et/ou à conférer le grade universitaire",
"Opérateur du programme 150 - Formations supérieures et recherche universitaire",
"Structure de recherche",
# "Établissement d'enseignement supérieur étranger"
]

class MATCH(Enum):
NO_ID_NO_MATCH = 0
Expand Down Expand Up @@ -76,6 +95,45 @@ def ods_get_df(ods_key: str, subset=None):
return df


def download_categories() -> dict:
keep_alive = 1
scroll_id = None
categories = {}
hits = []
size = 10000
count = 0
total = 0
headers = {"Authorization": ES_TOKEN}
url = f"{ES_URL}/paysage/_search?scroll={keep_alive}m"
query = {
"size": size,
"_source": ["id", "category"],
"query": {"match": {"type": "structures"}},
}

# Scroll to get all results
while total == 0 or count < total:
if scroll_id:
url = f"{ES_URL}/_search/scroll"
query = {"scroll": f"{keep_alive}m", "scroll_id": scroll_id}
res = requests.post(url=url, headers=headers, json=query)
if res.status_code == 200:
json = res.json()
scroll_id = json.get("_scroll_id")
total = json.get("hits").get("total").get("value")
data = json.get("hits").get("hits")
count += len(data)
sources = [d.get("_source") for d in data]
hits += sources
else:
print(f"Elastic error {res.status_code}: stop scroll ({count}/{total})")
break

if hits:
categories = {item["id"]: item["category"] for item in hits}
return categories


def paysage_get_names(paysage_row: pd.Series, use_acronym=False):
names = paysage_row[SUBSET_NAMES]
names = names.where(~names.duplicated(), "").to_list()
Expand Down Expand Up @@ -114,7 +172,7 @@ def affiliation_get_matches(affiliation: str, year=None):
if res.status_code == 202:
return res.json().get("results")

raise Exception("ERROR_{res.status_code}")
raise Exception(f"ERROR_{res.status_code}")


# api/match_list
Expand Down Expand Up @@ -240,9 +298,13 @@ def paysage_match_affiliations(match_type: str, use_match_list=False, use_acrony
df = ods_get_df(ODS_PAYSAGE_KEY, SUBSET_ALL)
print(f"Found {len(df)} structures.")

# Filter with France structures
df = df[df[COL_COUNTRY] == "France"].replace(np.nan, None).copy()
print(f"Found {len(df)} french structures.")
# Download categories
categories = download_categories()
df["category"] = df["identifiant_interne"].apply(lambda x: categories.get(x))

# Filter wanted categories
df = df[df["category"].isin(WANTED_CATEGORIES)].copy()
print(f"Filter {len(df)} entries with wanted categories")

# Create affiliations strings
df[SUBSET_AFFILIATION_STR] = df.apply(paysage_get_affiliations, use_acronym=use_acronym, axis=1)
Expand Down

0 comments on commit 71286d7

Please sign in to comment.