Skip to content

Commit

Permalink
Merge commit '0c9e2fa67c68bb16580e56bddc467868071c56c9'
Browse files Browse the repository at this point in the history
  • Loading branch information
ahonestla committed Apr 23, 2024
2 parents f3310c7 + 0c9e2fa commit b10d6ca
Show file tree
Hide file tree
Showing 5 changed files with 426 additions and 454 deletions.
399 changes: 150 additions & 249 deletions notebooks/paysage_analyse_match.ipynb

Large diffs are not rendered by default.

264 changes: 148 additions & 116 deletions project/server/main/load_paysage.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import os
import datetime
import pandas as pd
import numpy as np
import requests
from elasticsearch.client import IndicesClient

ODS_KEY = os.getenv("ODS_KEY")
ODS_PAYSAGE = "fr_esr_paysage_structures_all"

from project.server.main.elastic_utils import (
get_analyzers,
get_tokenizers,
Expand All @@ -30,10 +26,33 @@
logger = get_logger(__name__)

SOURCE = "paysage"
PAYSAGE_API_URL = "https://paysage-api.staging.dataesr.ovh"
PAYSAGE_API_KEY = os.getenv("PAYSAGE_API_KEY")

CATEGORIES = {
"mCpLW": "Université",
"Eg7tX": "Établissement public expérimental",
"93BR1": "Établissement supérieur d'architecture",
"2ZdzP": "Organisme de recherche",
"MTFHZ": "Société d'accélération du transfert de technologies",
"UfEnK": "Établissement d'enseignement supérieur privé d'intérêt général",
"Sv5bb": "Tutelle des établissements",
"mNJ1Z": "Incubateur public",
"WCat8": "Liste des établissements publics relevant du ministre chargé de l'Enseignement supérieur",
"fQ6GL": "Etablissements d’enseignement supérieur techniques privés (hors formations relevant du commerce et de la gestion)",
"WkSgR": "Etablissement publics d’enseignement supérieur entrant dans la cotutelle du ministre chargé de l’enseignement supérieur (Art L 123-1 du code de l’éducation)",
"YNqFb": "Commerce et gestion - Etablissements d’enseignement supérieur techniques privés et consulaires autorisés à délivrer un diplôme visé par le ministre chargé de l’enseignement supérieur et/ou à conférer le grade universitaire",
"iyn79": "Opérateur du programme 150 - Formations supérieures et recherche universitaire",
"z367d": "Structure de recherche",
"NsMkU": "Établissement d'enseignement supérieur étranger",
}


def load_paysage(index_prefix: str = "matcher") -> dict:
"""Load paysage data ton elastic indexes"""

logger.debug("Start loading Paysage data...")

es = MyElastic()
indices_client = IndicesClient(es)
settings = {
Expand All @@ -48,12 +67,12 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
"id",
"city",
"zone_emploi",
"country_code",
"acronym",
"year",
"name",
"year",
"wikidata",
"country",
"country_alpha2",
"country_alpha3",
"web_url",
"web_domain",
]
Expand All @@ -62,21 +81,17 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
"id": "light",
"city": "city_analyzer",
"zone_emploi": "city_analyzer",
"country_code": "light",
"acronym": "acronym_analyzer",
"name": "heavy_fr",
"name_txt": "heavy_fr",
"wikidata": "wikidata_analyzer",
"year": "light",
"country": "light",
"country_alpha2": "light",
"country_alpha3": "light",
"web_url": "url_analyzer",
"web_domain": "domain_analyzer",
}
criteria = exact_criteria + txt_criteria
criteria_unique = []
for c in criteria_unique:
criteria.append(f"{c}_unique")
analyzers[f"{c}_unique"] = analyzers[c]

logger.debug(f"Criteria {criteria}")

# Create Elastic Search index
Expand All @@ -88,10 +103,12 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
es_data[criterion] = {}

# Download paysage data
raw_records = download_data()
raw_data = download_data()
if not raw_data:
logger.error("Loading aborted: no paysage data")

# Transform paysage data
transformed_data = transform_data(raw_records)
transformed_data = transform_data(raw_data)

# Iterate over paysage data
logger.debug("Prepare data for elastic")
Expand All @@ -107,42 +124,41 @@ def load_paysage(index_prefix: str = "matcher") -> dict:
if criterion_value not in es_data[criterion]:
es_data[criterion][criterion_value] = []
es_data[criterion][criterion_value].append(
{"id": data_point["id"], "country_alpha2": data_point["country_alpha2"]}
{
"id": data_point["id"],
}
)
# Add unique criterion
for criterion in criteria_unique:
for criterion_value in es_data[criterion]:
if len(es_data[criterion][criterion_value]) == 1:
if f"{criterion}_unique" not in es_data:
es_data[f"{criterion}_unique"] = {}
es_data[f"{criterion}_unique"][criterion_value] = es_data[criterion][criterion_value]

# Bulk insert data into ES
actions = []
results = {}
for criterion in es_data:
for criterion in list(es_data.keys()):
index = get_index_name(index_name=criterion, source=SOURCE, index_prefix=index_prefix)
analyzer = analyzers[criterion]
results[index] = len(es_data[criterion])
for criterion_value in es_data[criterion]:
# if criterion in ['name']:
# tokens = get_tokens(indices_client, analyzer, index, criterion_value)
# if len(tokens) < 2:
# logger.debug(f'Not indexing {criterion_value} (not enough token to be relevant !)')
# continue
action = {
"_index": index,
"paysages": [k["id"] for k in es_data[criterion][criterion_value]],
"country_alpha2": list(set([k["country_alpha2"] for k in es_data[criterion][criterion_value]])),
}
if criterion in exact_criteria:
action["query"] = {
"match_phrase": {"content": {"query": criterion_value, "analyzer": analyzer, "slop": 1}}
"match_phrase": {
"content": {
"query": criterion_value,
"analyzer": analyzer,
"slop": 1,
}
}
}
elif criterion in txt_criteria:
action["query"] = {
"match": {
"content": {"query": criterion_value, "analyzer": analyzer, "minimum_should_match": "-10%"}
"content": {
"query": criterion_value,
"analyzer": analyzer,
"minimum_should_match": "-10%",
}
}
}
actions.append(action)
Expand All @@ -152,18 +168,34 @@ def load_paysage(index_prefix: str = "matcher") -> dict:


def download_data() -> list:
logger.debug(f"Download Paysage data from {ODS_PAYSAGE}")
data = pd.read_csv(
f"https://data.enseignementsup-recherche.gouv.fr/explore/dataset/{ODS_PAYSAGE}/download/?format=csv&apikey={ODS_KEY}",
sep=";",
low_memory=False,
)
records = data.replace(np.nan, None).to_dict(orient="records")
return records
"""Request paysage data from api"""
logger.debug("Start requesting paysage api")

# Request data
limit = 10000
filters = "&".join([f"filters[relatedObjectId]={category}" for category in list(CATEGORIES.keys())])
url = f"{PAYSAGE_API_URL}/relations?limit={limit}&filters[relationTag]=structure-categorie&{filters}"
headers = {"X-API-KEY": PAYSAGE_API_KEY}
response = requests.get(url=url, headers=headers)

if response.status_code != 200:
logger.error(f"Error {response.status_code} requesting {url}")
return None

data = response.json().get("data")
logger.debug(f"Found {len(data)} paysage records for {len(CATEGORIES)} categories")

data = pd.DataFrame(data).drop_duplicates(subset="resourceId").to_dict(orient="records")
logger.debug(f"Keep {len(data)} paysage records without duplicates")

return data


def transform_data(records: list) -> list:
logger.debug(f"Start transform of Paysage data ({len(records)} records)")
def transform_data(data: list) -> list:
"""Transform paysage data to elastic data"""

logger.debug(f"Start transform of Paysage data ({len(data)} records)")
es_records = []

# Loading zone emploi data
logger.debug(f"Load insee data")
Expand All @@ -176,90 +208,90 @@ def transform_data(records: list) -> list:

# Setting a dict with all names, acronyms and cities
logger.debug("Get data from Paysage records")
name_acronym_city = {}
for record in records:
current_id = record["identifiant_interne"]
name_acronym_city[current_id] = {}
for record in data:
current_id = record["resourceId"]
es_record = {"id": current_id}

resource = record["resource"]
naming = resource.get("currentName", {})
localisation = resource.get("currentLocalisation", {})

# Acronyms
acronyms, names = [], []
sigle = record.get("sigle")
name_short = record.get("nom_court")
if sigle:
acronyms.append(sigle)
if name_short:
if name_short.isalnum():
acronyms.append(name_short)
else:
names.append(name_short)
acronyms_list = ["acronymFr", "acronymEn", "acronymLocal"]
acronyms = [naming.get(acronym) for acronym in acronyms_list if naming.get(acronym)]

# Names
labels = ["uo_lib", "uo_lib_officiel", "uo_lib_en"]
names += [record.get(name) for name in labels if record.get(name)]
names_list = ["usualName", "officialName", "nameEn"]
names = [naming.get(name) for name in names_list if naming.get(name)]

short_name = naming.get("shortName")
if short_name:
if short_name.isalnum():
acronyms.append(short_name)
else:
names.append(short_name)

acronyms = list(set(acronyms))
names = list(set(names))
names = list(set(names) - set(acronyms))

# Cities, country_alpha2, and zone_emploi
cities, country_alpha2, zone_emploi = [], [], []
city = record.get("com_nom")
city_code = record.get("com_code")
country = record.get("pays_etranger_acheminement")
# City
city = localisation.get("locality")
if city:
clean_city = " ".join([s for s in city.split(" ") if s.isalpha()])
clean_city = " ".join([s for s in city.split(" ") if s.isalpha() and s.lower() != "cedex"])
city = clean_city if clean_city else city
cities.append(city)
if city_code in city_zone_emploi:
zone_emploi += city_zone_emploi[city_code]
if country:
alpha2 = get_alpha2_from_french(country)
country_alpha2.append(alpha2)

name_acronym_city[current_id]["city"] = clean_list(data=cities)
name_acronym_city[current_id]["zone_emploi"] = clean_list(zone_emploi)
name_acronym_city[current_id]["acronym"] = clean_list(data=acronyms, ignored=ACRONYM_IGNORED, min_character=2)
name_acronym_city[current_id]["name"] = clean_list(data=names, stopwords=FRENCH_STOP, min_token=2)
country_alpha2 = clean_list(data=country_alpha2)
if not country_alpha2:
country_alpha2 = ["fr"]
name_acronym_city[current_id]["country_alpha2"] = country_alpha2[0]

logger.debug("Transform records to elastic indexes")
es_paysages = []
for record in records:
paysage_id = record.get("identifiant_interne")
es_paysage = {"id": paysage_id}
# Acronyms & names
es_paysage["acronym"] = name_acronym_city[paysage_id]["acronym"]
names = name_acronym_city[paysage_id]["name"]
es_paysage["name"] = list(set(names) - set(es_paysage["acronym"]))
# Addresses
es_paysage["city"] = name_acronym_city[paysage_id]["city"]
es_paysage["country_alpha2"] = name_acronym_city[paysage_id]["country_alpha2"]
es_paysage["country_code"] = [name_acronym_city[paysage_id]["country_alpha2"]]
# Zone emploi
es_paysage["zone_emploi"] = name_acronym_city[paysage_id]["zone_emploi"]
# Wikidata
wikidata = record.get("identifiant_wikidata")
if wikidata:
es_paysage["wikidata"] = wikidata

# Zone emploi (+ academie + urban unit)
zone_emploi = []
city_code = localisation.get("postalCode")
if city_code in city_zone_emploi:
zone_emploi += city_zone_emploi[city_code]

# Countries
country = localisation.get("country")
country_alpha2 = get_alpha2_from_french(country) if country else None
country_alpha3 = localisation.get("iso3")

# Dates
last_year = f"{datetime.date.today().year}"
start_date = record.get("date_creation")
start_date = resource.get("creationDate")
if not start_date:
start_date = "2010"
start = int(start_date[0:4])
end_date = record.get("date_fermeture")
end_date = resource.get("closureDate")
if not end_date:
end_date = last_year
end_date = f"{datetime.date.today().year}"
end = int(end_date[0:4])
# Start date one year before official as it can be used before sometimes
es_paysage["year"] = [str(y) for y in list(range(start - 1, end + 1))]
# Url
url = record.get("url")
if isinstance(url, list):
raise Exception("found list url", url)
if url:
es_paysage["web_url"] = clean_url(url)
es_paysage["web_domain"] = get_url_domain(url)

es_paysages.append(es_paysage)
return es_paysages
years = [str(y) for y in list(range(start - 1, end + 1))]

# Websites
web_urls, web_domains = [], []
websites = resource.get("websites", [])
for website in websites:
url = website.get("url")
if url:
web_urls.append(clean_url(url))
web_domains.append(get_url_domain(url))
web_urls = list(set(web_urls))
web_domains = list(set(web_domains))

# Elastic record
es_record["acronym"] = clean_list(
data=acronyms,
stopwords=FRENCH_STOP,
ignored=ACRONYM_IGNORED,
min_character=2,
)
es_record["name"] = clean_list(data=names, stopwords=FRENCH_STOP, min_token=2)
es_record["city"] = clean_list(data=city, stopwords=FRENCH_STOP, min_character=2)
es_record["zone_emploi"] = clean_list(data=zone_emploi, stopwords=FRENCH_STOP)
es_record["country"] = clean_list(data=country, stopwords=FRENCH_STOP)
es_record["country_alpha2"] = clean_list(data=country_alpha2, stopwords=FRENCH_STOP)
es_record["country_alpha3"] = clean_list(data=country_alpha3, stopwords=FRENCH_STOP)
es_record["year"] = years
es_record["web_url"] = web_urls
es_record["web_domain"] = web_domains

es_records.append(es_record)

return es_records
20 changes: 11 additions & 9 deletions project/server/main/match_paysage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
from project.server.main.utils import FRENCH_STOP, remove_ref_index

DEFAULT_STRATEGIES = [
[["paysage_id"]],
[["paysage_acronym", "paysage_name", "paysage_zone_emploi"], ["paysage_acronym", "paysage_name", "paysage_city"]],
[["paysage_name", "paysage_acronym"]],
[["paysage_name", "paysage_web_url"]],
[["paysage_name", "paysage_zone_emploi"], ["paysage_name", "paysage_city"]],
[["paysage_acronym", "paysage_web_url"]],
[["paysage_acronym", "paysage_zone_emploi"], ["paysage_acronym", "paysage_city"]],
[["paysage_name", "paysage_acronym"]],
[["paysage_acronym", "paysage_city"]],
[
["paysage_id"],
["paysage_acronym", "paysage_name", "paysage_city"],
["paysage_acronym", "paysage_name", "paysage_zone_emploi"],
["paysage_name", "paysage_acronym"],
["paysage_name", "paysage_zone_emploi"],
["paysage_name", "paysage_city"],
["paysage_acronym", "paysage_zone_emploi"],
["paysage_acronym", "paysage_city"],
["paysage_acronym", "paysage_city"],
]
]

STOPWORDS_STRATEGIES = {"paysage_name": FRENCH_STOP}
Expand Down
Loading

0 comments on commit b10d6ca

Please sign in to comment.