From f91837ef2d8b72e8e222a7db2adc0900e09dac7f Mon Sep 17 00:00:00 2001 From: JoschD <26184899+JoschD@users.noreply.github.com> Date: Tue, 27 Sep 2022 11:49:57 +0200 Subject: [PATCH] Knob extractor pandafied (#399) * Everything is tfs-pandas * Time in the TFS-Header * "Others" category printed only when present * Version bump and changelog --- CHANGELOG.md | 8 ++ omc3/__init__.py | 2 +- omc3/knob_extractor.py | 216 +++++++++++++++++++---------- tests/unit/test_knob_extractor.py | 223 ++++++++++++++++++++++-------- 4 files changed, 316 insertions(+), 133 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd82001be..3fabb4b68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # OMC3 Changelog +#### 2022-09-27 - v0.6.3 + +- Pandafied `knob_extractor` internally and python output. + +#### 2022-09-22 - v0.6.2 + +- Cleaned logging in `knob_extractor` + #### 2022-09-21 - v0.6.1 - Added: diff --git a/omc3/__init__.py b/omc3/__init__.py index 53c35939d..74c0be6fb 100644 --- a/omc3/__init__.py +++ b/omc3/__init__.py @@ -11,7 +11,7 @@ __title__ = "omc3" __description__ = "An accelerator physics tools package for the OMC team at CERN." __url__ = "https://github.com/pylhc/omc3" -__version__ = "0.6.2" +__version__ = "0.6.3" __author__ = "pylhc" __author_email__ = "pylhc@github.com" __license__ = "MIT" diff --git a/omc3/knob_extractor.py b/omc3/knob_extractor.py index 9bffcd55d..3c8eb81f7 100644 --- a/omc3/knob_extractor.py +++ b/omc3/knob_extractor.py @@ -67,16 +67,15 @@ import logging import math import re -from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Sequence, Tuple, Union import pandas as pd -import tfs from dateutil.relativedelta import relativedelta -from generic_parser import EntryPointParameters, entrypoint +import tfs +from generic_parser import EntryPointParameters, entrypoint from omc3.utils.iotools import PathOrStr, PathOrStrOrDataFrame from omc3.utils.logging_tools import get_logger from omc3.utils.mock import cern_network_import @@ -92,6 +91,20 @@ MINUS_CHARS: Tuple[str, ...] = ("_", "-") + +class Col: + """ DataFrame Columns used in this script. """ + madx: str = "madx" + lsa: str = "lsa" + scaling: str = "scaling" + value: str = "value" + + +class Head: + """ TFS Headers used in this script.""" + time: str = "EXTRACTION_TIME" + + KNOB_CATEGORIES: Dict[str, List[str]] = { "sep": [ "LHCBEAM:IP1-SEP-H-MM", @@ -204,22 +217,6 @@ def get_params(): ) -@dataclass -class KnobEntry: - madx: str # the name of the MAD-X variable for this knob - lsa: str # the name of the knob in LSA itself - scaling: float # is usually +-1, i.e. takes care of sign-conventions - value: float = None - - def get_madx_command(self) -> str: - if self.value is None: - return f"! {self.madx} : No Value extracted" - return f"{self.madx} := {self.value * self.scaling};" - - -KnobsDict = Dict[str, KnobEntry] - - @entrypoint( get_params(), strict=True, argument_parser_args=dict( @@ -228,7 +225,7 @@ def get_madx_command(self) -> str: prog="Knob Extraction Tool." ) ) -def main(opt) -> Optional[KnobsDict]: +def main(opt) -> Optional[tfs.TfsDataFrame]: """ Main knob extracting function. """ ldb = pytimber.LoggingDB(source="nxcals", loglevel=logging.ERROR) time = _parse_time(opt.time, opt.timedelta) @@ -243,39 +240,36 @@ def main(opt) -> Optional[KnobsDict]: return None knobs_dict = _parse_knobs_defintions(opt.knob_definitions) - knobs_extract = _extract(ldb, knobs_dict, opt.knobs, time) + knobs_extract = _extract_and_gather(ldb, knobs_dict, opt.knobs, time) if opt.output: - _write_knobsfile(opt.output, knobs_extract, time) + _write_knobsfile(opt.output, knobs_extract) return knobs_extract -def _extract(ldb, knobs_dict: KnobsDict, knob_categories: Sequence[str], time: datetime) -> KnobsDict: +def extract(ldb, knobs: Sequence[str], time: datetime) -> Dict[str, float]: """ - Main function to gather data from the state-tracker. + Standalone function to gather data from the StateTracker. + Extracts data via pytimber's LoggingDB for the knobs given + (either by name or by category) in knobs. Args: ldb (pytimber.LoggingDB): The pytimber database. - knobs_dict (KnobsDict): A mapping of all knob-names to KnobEntries. - knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract. + knobs (Sequence[str]): Knob Categories or Knob-Names to extract. time (datetime): The time, when to extract. Returns: - Dict[str, KnobsDict]: Contains all the extracted knobs, grouped by categories. - When extraction was not possible, the value attribute of the respective KnobEntry is still None + Dict[str, float]: Contains all the extracted knobs. + When extraction was not possible, the value is None. """ LOGGER.info(f"---- EXTRACTING KNOBS @ {time} ----") - knobs = {} + knobs_extracted = {} - for category in knob_categories: + for category in knobs: for knob in KNOB_CATEGORIES.get(category, [category]): - try: - knobs[knob] = knobs_dict[knob] - except KeyError as e: - raise KeyError(f"Knob '{knob}' not found in the knob-definitions!") from e - - # LOGGER.debug(f"Looking for {knob:<34s} ") # pytimber logs this to info anyway knobkey = f"LhcStateTracker:{knob}:target" + knobs_extracted[knob] = None # to log that this was tried to be extracted. + knobvalue = ldb.get(knobkey, time.timestamp()) # use timestamp to preserve timezone info if knobkey not in knobvalue: LOGGER.warning(f"No value for {knob} found") @@ -292,38 +286,96 @@ def _extract(ldb, knobs_dict: KnobsDict, knob_categories: Sequence[str], time: d continue LOGGER.info(f"Knob value for {knob} extracted: {value} (unscaled)") - knobs[knob].value = value + knobs_extracted[knob] = value + + return knobs_extracted + + +def check_for_undefined_knobs(knobs_definitions: pd.DataFrame, knob_categories: Sequence[str]): + """ Check that all knobs are actually defined in the knobs-definitions. + + + Args: + knobs_definitions (pd.DataFrame): A mapping of all knob-names to KnobEntries. + knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract. + + Raises: + KeyError: If one or more of the knobs don't have a definition. + + """ + knob_names = [knob for category in knob_categories for knob in KNOB_CATEGORIES.get(category, [category])] + undefined_knobs = [knob for knob in knob_names if knob not in knobs_definitions.index] + if undefined_knobs: + raise KeyError( + "The following knob(s) could not be found " + f"in the knob-definitions: '{', '.join(undefined_knobs)}'" + ) + + +def _extract_and_gather(ldb, knobs_definitions: pd.DataFrame, + knob_categories: Sequence[str], + time: datetime) -> tfs.TfsDataFrame: + """ + Main function to gather data from the StateTracker and the knob-definitions. + All given knobs (either in categories or as knob names) to be extracted + are checked for being present in the ``knob_definitions``. + A TfsDataFrame is returned, containing the knob-definitions of the + requested knobs and the extracted value (or NAN if not successful). + + Args: + ldb (pytimber.LoggingDB): The pytimber database. + knobs_definitions (pd.DataFrame): A mapping of all knob-names to KnobEntries. + knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract. + time (datetime): The time, when to extract. + Returns: + tfs.TfsDataframe: Contains all the extracted knobs, in columns containing + their madx-name, lsa-name, scaling and extracted value. + When extraction was not possible, the value of the respective entry is NAN. + + """ + check_for_undefined_knobs(knobs_definitions, knob_categories) + extracted_knobs = extract(ldb, knobs=knob_categories, time=time) + + knob_names = list(extracted_knobs.keys()) + knobs = tfs.TfsDataFrame(index=knob_names, + columns=[Col.lsa, Col.madx, Col.scaling, Col.value], + headers={Head.time: time}) + knobs[[Col.lsa, Col.madx, Col.scaling]] = knobs_definitions.loc[knob_names, :] + knobs[Col.value] = pd.Series(extracted_knobs) return knobs -def _write_knobsfile(output: Union[Path, str], collected_knobs: KnobsDict, time): +def _write_knobsfile(output: Union[Path, str], collected_knobs: tfs.TfsDataFrame): """ Takes the collected knobs and writes them out into a text-file. """ - collected_knobs = collected_knobs.copy() # to not modify the return dict + collected_knobs = collected_knobs.copy() # to not modify the df # Sort the knobs by category - category_knobs = {c: {} for c in KNOB_CATEGORIES.keys()} - for category, names in KNOB_CATEGORIES.items(): - for name in names: - if name in collected_knobs.keys(): - category_knobs[category][name] = collected_knobs.pop(name) - category_knobs["Other Knobs"] = collected_knobs + category_knobs = {} + for category, category_names in KNOB_CATEGORIES.items(): + names = [name for name in collected_knobs.index if name in category_names] + if not names: + continue + + category_knobs[category] = collected_knobs.loc[names, :] + collected_knobs = collected_knobs.drop(index=names) + + if len(collected_knobs): # leftover knobs without category + category_knobs["Other Knobs"] = collected_knobs # Write them out with open(output, "w") as outfile: outfile.write(f"!! --- knobs extracted by knob_extractor\n") - outfile.write(f"!! --- extracted knobs for time {time}\n\n") - for category, knobs in category_knobs.items(): - if not knobs: - continue + outfile.write(f"!! --- extracted knobs for time {collected_knobs.headers[Head.time]}\n\n") + for category, knobs_df in category_knobs.items(): outfile.write(f"!! --- {category:10} --------------------\n") - for knob, knob_entry in knobs.items(): - outfile.write(f"{knob_entry.get_madx_command()}\n") + for knob, knob_entry in knobs_df.iterrows(): + outfile.write(f"{get_madx_command(knob_entry)}\n") outfile.write("\n") outfile.write("\n") -# Knobs Dict ------------------------------------------------------------------- +# Knobs Definitions ------------------------------------------------------------ def _get_knobs_def_file(user_defined: Optional[Union[Path, str]] = None) -> Path: """ Check which knobs-definition file is appropriate to take. """ @@ -343,17 +395,18 @@ def _get_knobs_def_file(user_defined: Optional[Union[Path, str]] = None) -> Path raise FileNotFoundError("None of the knobs-definition files are available.") -def _load_knobs_dict(file_path: Union[Path, str]) -> KnobsDict: - """ Load the knobs-definition file and convert into KnobsDict. - Each line in this file should consist of four comma separated entries: - madx-name, lsa-name, scaling factor, knob-test value. +def load_knobs_definitions(file_path: Union[Path, str]) -> pd.DataFrame: + """ Load the knobs-definition file and convert into a DataFrame. + Each line in this file should consist of at least three comma separated + entries in the following order: madx-name, lsa-name, scaling factor. + Other columns are ignored. Alternatively, a TFS-file is also allowed, but needs to have the suffix ``.tfs``. Args: file_path (Path): Path to the knobs definition file. Returns: - Dictionary with LSA names (but with colon instead of /) as + Dataframe with LSA names (but with colon instead of /) as keys and KnobEntries (without values) as value. """ if Path(file_path).suffix == ".tfs": @@ -361,39 +414,54 @@ def _load_knobs_dict(file_path: Union[Path, str]) -> KnobsDict: df = tfs.read_tfs(file_path) else: # parse csv file (the official way) - dtypes = {"madx": str, "lsa": str, "scaling": float, "test": float} - converters = {'madx': str.strip, 'lsa': str.strip} # strip whitespaces - df = pd.read_csv(file_path, comment="#", names=dtypes.keys(), dtype=dtypes, converters=converters) - return _dataframe_to_knobsdict(df) - - -def _dataframe_to_knobsdict(df: pd.DataFrame) -> KnobsDict: - """ Converts a DataFrame into the required Dictionary structure. + converters = {Col.madx: str.strip, Col.lsa: str.strip} # strip whitespaces + dtypes = {Col.scaling: float} + names = (Col.madx, Col.lsa, Col.scaling) + df = pd.read_csv(file_path, + comment="#", + usecols=list(range(len(names))), # only read the first columns + names=names, + dtype=dtypes, + converters=converters) + return _to_knobs_dataframe(df) + + +def _to_knobs_dataframe(df: pd.DataFrame) -> pd.DataFrame: + """ Adapts a DataFrame to the conventions used here: + StateTracker variable name as index, all columns lower-case. Args: df (pd.DataFrame): DataFrame containing at least the columns 'lsa', 'madx', 'scaling' (upper or lowercase) Returns: - Dictionary with LSA names (but with colon instead of /) as - keys and KnobEntries (without values) as value. + Dataframe with LSA names (but with colon instead of /) as + keys and 'lsa', 'madx', 'scaling' and (empty) 'value' columns. """ df.columns = df.columns.astype(str).str.lower() - df = df[['lsa', 'madx', 'scaling']].set_index("lsa", drop=False) - return { - lsa2name(r[0]): KnobEntry(**r[1].to_dict()) for r in df.iterrows() - } + df = df[[Col.lsa, Col.madx, Col.scaling]].set_index(Col.lsa, drop=False) + df.index = df.index.map(lsa2name) + return df -def _parse_knobs_defintions(knobs_def_input: Optional[Union[Path, str, pd.DataFrame]]) -> KnobsDict: +def _parse_knobs_defintions(knobs_def_input: Optional[Union[Path, str, pd.DataFrame]]) -> pd.DataFrame: """ Parse the given knob-definitions either from a csv-file or from a DataFrame. """ if isinstance(knobs_def_input, pd.DataFrame): - return _dataframe_to_knobsdict(knobs_def_input) + return _to_knobs_dataframe(knobs_def_input) # input points to a file or is None knobs_def_file = _get_knobs_def_file(knobs_def_input) - return _load_knobs_dict(knobs_def_file) + return load_knobs_definitions(knobs_def_file) + + +def get_madx_command(knob_data: pd.Series) -> str: + if Col.value not in knob_data.index: + raise KeyError("Value entry not found in extracted knob_data. " + "Something went wrong as it should at least be NaN.") + if knob_data[Col.value] is None or pd.isna(knob_data[Col.value]): + return f"! {knob_data[Col.madx]} : No Value extracted" + return f"{knob_data[Col.madx]} := {knob_data[Col.value] * knob_data[Col.scaling]};" # Time Tools ------------------------------------------------------------------- diff --git a/tests/unit/test_knob_extractor.py b/tests/unit/test_knob_extractor.py index a201a6aa8..5a055c808 100644 --- a/tests/unit/test_knob_extractor.py +++ b/tests/unit/test_knob_extractor.py @@ -7,17 +7,17 @@ import numpy as np import pandas as pd import pytest -import tfs +import tfs from generic_parser import EntryPoint from generic_parser.dict_parser import ArgumentError from omc3 import knob_extractor -from omc3.knob_extractor import (KNOB_CATEGORIES, KnobEntry, _add_time_delta, - _extract, _parse_knobs_defintions, +from omc3.knob_extractor import (KNOB_CATEGORIES, _add_time_delta, + _extract_and_gather, _parse_knobs_defintions, _parse_time, _write_knobsfile, lsa2name, main, - get_params + get_params, Col, get_madx_command, Head, + check_for_undefined_knobs, load_knobs_definitions ) - from tests.conftest import cli_args INPUTS = Path(__file__).parent.parent / "inputs" / "knob_extractor" @@ -27,12 +27,12 @@ class TestFullRun: @pytest.mark.basic @pytest.mark.parametrize("commandline", [True, False], ids=["as function", "cli"]) def test_full(self, tmp_path, knob_definitions, monkeypatch, commandline): - knobs_dict = _parse_knobs_defintions(knob_definitions) + kobs_defs = _parse_knobs_defintions(knob_definitions) all_variables = [knob for category in KNOB_CATEGORIES.values() for knob in category] for knob in all_variables: value = np.random.random() * 10 - 5 threshold = np.random.random() < 0.3 - knobs_dict[knob].value = 0.0 if threshold else value + kobs_defs.loc[knob, Col.value] = 0.0 if threshold else value start_time = datetime.now().timestamp() path = tmp_path / "knobs.txt" @@ -47,7 +47,7 @@ def get(key, time): now_time = datetime.now().timestamp() assert start_time <= time <= now_time name = ":".join(key.split(":")[1:-1]) - return {key: [[739173129, 42398328], [-1, knobs_dict[name].value]]} + return {key: [[739173129, 42398328], [-1, kobs_defs.loc[name, Col.value]]]} class MockTimber: LoggingDB = MyLDB @@ -66,7 +66,8 @@ class MockTimber: parsed_output, _ = parse_output_file(path) assert len(all_variables) == len(parsed_output) for knob in all_variables: - assert parsed_output[knobs_dict[knob].madx] == knobs_dict[knob].value * knobs_dict[knob].scaling + knob_entry = kobs_defs.loc[knob, :] + assert parsed_output[knob_entry[Col.madx]] == knob_entry[Col.value] * knob_entry[Col.scaling] else: knobs_extracted = main(time="now", output=path, knob_definitions=knob_definitions) @@ -76,9 +77,10 @@ class MockTimber: assert len(all_variables) == len(parsed_output) assert len(knobs_extracted) == len(parsed_output) for knob in all_variables: - assert knobs_dict[knob].value == knobs_extracted[knob].value - assert parsed_output[knobs_extracted[knob].madx] == knobs_extracted[knob].value * knobs_extracted[knob].scaling - + knob_entry = kobs_defs.loc[knob, :] + assert knob_entry[Col.value] == knobs_extracted.loc[knob, Col.value] + assert parsed_output[knob_entry[Col.madx]] == knob_entry[Col.value] * knob_entry[Col.scaling] + @pytest.mark.basic @pytest.mark.parametrize("commandline", [True, False], ids=["as function", "cli"]) def test_state(self, tmp_path, monkeypatch, caplog, commandline): @@ -120,7 +122,7 @@ class MockTimber: assert "The State of the affairs" in caplog.text @pytest.mark.basic - def test_knob_not_defined(self, knob_definitions, monkeypatch): + def test_knob_not_defined_run(self, knob_definitions, monkeypatch): # Mock Pytimber --- class MyLDB: def __init__(self, *args, **kwargs): @@ -128,62 +130,88 @@ def __init__(self, *args, **kwargs): @staticmethod def get(key, time): - raise ValueError("This test failed: The code should not have run this far.") + raise ArgumentError("Got past the KnobCheck!") + # return {key: [[478973], [343.343]]} class MockTimber: LoggingDB = MyLDB monkeypatch.setattr(knob_extractor, "pytimber", MockTimber()) + knob_definitions_df = load_knobs_definitions(knob_definitions) + + # run ------------------------------------------------------------------ + knobs_undefined = ["non_existent_knob", "other_knob"] + knobs_defined = knob_definitions_df.index.tolist() + + # undefined only --- + with pytest.raises(KeyError) as e: + main(knob_definitions=knob_definitions, knobs=knobs_undefined) + + for knob in knobs_undefined: + assert knob in str(e) + + # defined only --- + with pytest.raises(ArgumentError) as e: + main(knob_definitions=knob_definitions, knobs=knobs_defined) + assert "KnobCheck" in str(e) # see mock Pytimber above - # run --- - knob_name = "non_existent_knob" + # both --- with pytest.raises(KeyError) as e: - main(knob_definitions=knob_definitions, knobs=[knob_name]) - assert knob_name in str(e) + main(knob_definitions=knob_definitions, knobs=knobs_undefined+knobs_defined) + + for knob in knobs_undefined: + assert knob in str(e) class TestKnobExtraction: @pytest.mark.basic def test_extraction(self): - knobs_dict = { - "LHCBEAM1:LANDAU_DAMPING": KnobEntry(madx="landau1", lsa="LHCBEAM1/LANDAU_DAMPING", scaling=-1), - "LHCBEAM2:LANDAU_DAMPING": KnobEntry(madx="landau2", lsa="LHCBEAM1/LANDAU_DAMPING", scaling=-1), - "other": KnobEntry(madx="other_knob", lsa="other/knob", scaling=1), - } + knobs_dict = pd.DataFrame({ + "LHCBEAM1:LANDAU_DAMPING": knob_def(madx="landau1", lsa="LHCBEAM1/LANDAU_DAMPING", scaling=-1), + "LHCBEAM2:LANDAU_DAMPING": knob_def(madx="landau2", lsa="LHCBEAM1/LANDAU_DAMPING", scaling=-1), + "other": knob_def(madx="other_knob", lsa="other/knob", scaling=1), + }).transpose() values = [8904238, 34.323904, 3489.23409] time = datetime.now() timestamp = time.timestamp()*1e9 # java format fake_ldb = { f"LhcStateTracker:{key}:target": {f"LhcStateTracker:{key}:target": [[timestamp, timestamp], [-1, value]]} - for key, value in zip(knobs_dict.keys(), values) + for key, value in zip(knobs_dict.index, values) } - extracted = _extract(fake_ldb, knobs_dict=knobs_dict, knob_categories=["mo", "other"], time=time) + extracted = _extract_and_gather(fake_ldb, knobs_definitions=knobs_dict, knob_categories=["mo", "other"], time=time) assert len(extracted) == len(knobs_dict) - for idx, (key, entry) in enumerate(extracted.items()): - assert entry.value == values[idx] # depends on the order of "mo" in the file + for idx, (key, entry) in enumerate(extracted.iterrows()): + assert entry[Col.value] == values[idx] # depends on the order of "mo" in the file class TestIO: @pytest.mark.basic - def test_parse_knobdict_from_file(self, knob_definitions): - knob_dict = _parse_knobs_defintions(knob_definitions) + def test_parse_knobdefs_from_file(self, knob_definitions): + knob_defs = _parse_knobs_defintions(knob_definitions) for knobs in KNOB_CATEGORIES.values(): for knob in knobs: - assert knob in knob_dict.keys() - knob_entry = knob_dict[knob] + assert knob in knob_defs.index + knob_entry = knob_defs.loc[knob, :].copy() + + assert abs(knob_entry[Col.scaling]) == 1 + assert lsa2name(knob_entry[Col.lsa]) == knob + assert len(knob_entry[Col.madx]) - assert knob_entry.value is None - assert abs(knob_entry.scaling) == 1 - assert lsa2name(knob_entry.lsa) == knob - assert len(knob_entry.madx) - assert knob_entry.madx in knob_entry.get_madx_command() - assert knob_entry.get_madx_command().strip().startswith("!") + with pytest.raises(KeyError) as e: + get_madx_command(knob_entry) + assert "Value entry not found" in str(e) - knob_entry.value = 10 - assert str(10) in knob_entry.get_madx_command() + knob_entry[Col.value] = pd.NA + madx_command = get_madx_command(knob_entry) + assert knob_entry[Col.madx] in madx_command + assert madx_command.strip().startswith("!") + + knob_entry[Col.value] = 10 + madx_command = get_madx_command(knob_entry) + assert str(10) in madx_command @pytest.mark.basic def test_parse_knobdict_from_dataframe(self, tmp_path): @@ -192,33 +220,108 @@ def test_parse_knobdict_from_dataframe(self, tmp_path): path = tmp_path / "knob_defs.tfs" tfs.write(path, df) - knob_dict = _parse_knobs_defintions(path) - assert len(knob_dict) == 1 - assert "lsa_name" in knob_dict - knob_entry = knob_dict["lsa_name"] - assert knob_entry.lsa == "lsa_name" - assert knob_entry.madx == "madx_name" - assert knob_entry.scaling == 1 + knob_defs = _parse_knobs_defintions(path) + assert len(knob_defs) == 1 + assert "lsa_name" in knob_defs.index + knob_entry = knob_defs.loc["lsa_name", :] + assert knob_entry[Col.lsa] == "lsa_name" + assert knob_entry[Col.madx] == "madx_name" + assert knob_entry[Col.scaling] == 1 @pytest.mark.basic def test_write_file(self, tmp_path): - knobs_dict = { - "LHCBEAM1:LANDAU_DAMPING": KnobEntry(madx="moknob1", lsa="moknob1.lsa", scaling=-1, value=-4783), - "LHCBEAM2:LANDAU_DAMPING": KnobEntry(madx="moknob2", lsa="moknob2.lsa", scaling=1, value=0.0), # one should be 0.0 to test this case - "knob1": KnobEntry(madx="knob1.madx", lsa="knob1.lsa", scaling=-1, value=12.43383), - "knob2": KnobEntry(madx="knob2.madx", lsa="knob2.lsa", scaling=1, value=-3.0231), - "knob3": KnobEntry(madx="knob3.madx", lsa="knob3.lsa", scaling=-1, value=-9.7492), - } + knobs_defs = pd.DataFrame({ + "LHCBEAM1:LANDAU_DAMPING": knob_def(madx="moknob1", lsa="moknob1.lsa", scaling=-1, value=-4783), + "LHCBEAM2:LANDAU_DAMPING": knob_def(madx="moknob2", lsa="moknob2.lsa", scaling=1, value=0.0), # one should be 0.0 to test this case + "knob1": knob_def(madx="knob1.madx", lsa="knob1.lsa", scaling=-1, value=12.43383), + "knob2": knob_def(madx="knob2.madx", lsa="knob2.lsa", scaling=1, value=-3.0231), + "knob3": knob_def(madx="knob3.madx", lsa="knob3.lsa", scaling=-1, value=-9.7492), + }).transpose() path = tmp_path / "knobs.txt" time = datetime.now() - _write_knobsfile(path, knobs_dict, time=time) + knobs_defs = tfs.TfsDataFrame(knobs_defs, headers={Head.time: time}) + _write_knobsfile(path, knobs_defs) read_as_dict, full_text = parse_output_file(path) assert str(time) in full_text assert " mo " in full_text assert " Other Knobs " in full_text - assert len(read_as_dict) == len(knobs_dict) - for _, entry in knobs_dict.items(): - assert read_as_dict[entry.madx] == entry.value * entry.scaling + assert len(read_as_dict) == len(knobs_defs) + for _, entry in knobs_defs.iterrows(): + assert read_as_dict[entry.madx] == entry[Col.value] * entry[Col.scaling] + + @pytest.mark.basic + def test_knob_not_defined(self, knob_definitions, monkeypatch): + knob_definitions_df = load_knobs_definitions(knob_definitions) + + # run ------------------------------------------------------------------ + knobs_undefined = ["this_knob_does_not_exist", "Knobby_McKnobface"] + knobs_defined = knob_definitions_df.index.tolist() + knob_categories = list(KNOB_CATEGORIES.keys()) + + # undefined only --- + with pytest.raises(KeyError) as e: + check_for_undefined_knobs(knob_definitions_df, knobs_undefined) + + for knob in knobs_undefined: + assert knob in str(e) + + # defined only --- + check_for_undefined_knobs(knob_definitions_df, knobs_defined) + check_for_undefined_knobs(knob_definitions_df, knob_categories) + check_for_undefined_knobs(knob_definitions_df, knobs_defined + knob_categories) + + # all --- + with pytest.raises(KeyError) as e: + check_for_undefined_knobs(knob_definitions_df, + knob_categories + knobs_undefined + knobs_defined) + + for knob in knobs_undefined: + assert knob in str(e) + + @pytest.mark.basic + def test_load_knobdefinitions_with_any_number_entries(self, tmp_path): + definition_file = tmp_path / "knob_defs_tmp.txt" + values = [18.8, 12.0, 10, 108.8] + definition_file.write_text( + f"knob1_madx, knob1/lsa, {values[0]}, 19.8, 38\n" + f"knob2_madx, knob2/lsa, {values[1]}, 483.8\n" + f"knob3_madx, knob3/lsa, {values[2]}\n" + f"knob4_madx, knob4/lsa, {values[3]}, 19.8, other stuff\n" + ) + + df = load_knobs_definitions(definition_file) + assert len(df) == len(values) + + for idx, value in enumerate(values, start=1): + name = f"knob{idx}:lsa" + assert name in df.index + assert df.loc[name, Col.scaling] == value + assert df.loc[name, Col.madx] == f"knob{idx}_madx" + assert df.loc[name, Col.lsa] == f"knob{idx}/lsa" + + @pytest.mark.basic + def test_load_knobdefinitions_fail_no_scaling(self, tmp_path): + definition_file = tmp_path / "knob_defs_tmp.txt" + definition_file.write_text( + f"knob1_madx, knob1/lsa\n" + f"knob2_madx, knob2/lsa\n" + ) + + with pytest.raises(pd.errors.ParserError) as e: + load_knobs_definitions(definition_file) + assert "expected 3 and found 2" in str(e) + + @pytest.mark.basic + def test_load_knobdefinitions_fail_wrong_scaling(self, tmp_path): + definition_file = tmp_path / "knob_defs_tmp.txt" + definition_file.write_text( + f"knob1_madx, knob1/lsa, wrong\n" + ) + + # with pytest.raises(pd.errors.ParserError): + with pytest.raises(ValueError) as e: + load_knobs_definitions(definition_file) + assert "could not convert string to float" in str(e) class TestTime: @@ -338,6 +441,10 @@ def test_extractor_in_cern_network(self, tmp_path, knob_definitions, saved_knobf # Helper ----------------------------------------------------------------------- +def knob_def(**kwargs): + return pd.Series(dict(**kwargs)) + + def parse_output_file(file_path) -> Tuple[Dict[str, float], str]: txt = Path(file_path).read_text() d = {} @@ -377,4 +484,4 @@ def saved_knobfile_and_time() -> Tuple[Path, str]: @pytest.fixture() def main_entrypoint() -> EntryPoint: - return EntryPoint(get_params(), strict=True) \ No newline at end of file + return EntryPoint(get_params(), strict=True)