Skip to content

Commit

Permalink
Knob extractor pandafied (#399)
Browse files Browse the repository at this point in the history
* Everything is tfs-pandas
* Time in the TFS-Header
* "Others" category printed only when present
* Version bump and changelog
  • Loading branch information
JoschD authored Sep 27, 2022
1 parent fab4c49 commit f91837e
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 133 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# OMC3 Changelog

#### 2022-09-27 - v0.6.3

- Pandafied `knob_extractor` internally and python output.

#### 2022-09-22 - v0.6.2

- Cleaned logging in `knob_extractor`

#### 2022-09-21 - v0.6.1

- Added:
Expand Down
2 changes: 1 addition & 1 deletion omc3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
__title__ = "omc3"
__description__ = "An accelerator physics tools package for the OMC team at CERN."
__url__ = "https://github.com/pylhc/omc3"
__version__ = "0.6.2"
__version__ = "0.6.3"
__author__ = "pylhc"
__author_email__ = "[email protected]"
__license__ = "MIT"
Expand Down
216 changes: 142 additions & 74 deletions omc3/knob_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,15 @@
import logging
import math
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union

import pandas as pd
import tfs
from dateutil.relativedelta import relativedelta
from generic_parser import EntryPointParameters, entrypoint

import tfs
from generic_parser import EntryPointParameters, entrypoint
from omc3.utils.iotools import PathOrStr, PathOrStrOrDataFrame
from omc3.utils.logging_tools import get_logger
from omc3.utils.mock import cern_network_import
Expand All @@ -92,6 +91,20 @@

MINUS_CHARS: Tuple[str, ...] = ("_", "-")


class Col:
""" DataFrame Columns used in this script. """
madx: str = "madx"
lsa: str = "lsa"
scaling: str = "scaling"
value: str = "value"


class Head:
""" TFS Headers used in this script."""
time: str = "EXTRACTION_TIME"


KNOB_CATEGORIES: Dict[str, List[str]] = {
"sep": [
"LHCBEAM:IP1-SEP-H-MM",
Expand Down Expand Up @@ -204,22 +217,6 @@ def get_params():
)


@dataclass
class KnobEntry:
madx: str # the name of the MAD-X variable for this knob
lsa: str # the name of the knob in LSA itself
scaling: float # is usually +-1, i.e. takes care of sign-conventions
value: float = None

def get_madx_command(self) -> str:
if self.value is None:
return f"! {self.madx} : No Value extracted"
return f"{self.madx} := {self.value * self.scaling};"


KnobsDict = Dict[str, KnobEntry]


@entrypoint(
get_params(), strict=True,
argument_parser_args=dict(
Expand All @@ -228,7 +225,7 @@ def get_madx_command(self) -> str:
prog="Knob Extraction Tool."
)
)
def main(opt) -> Optional[KnobsDict]:
def main(opt) -> Optional[tfs.TfsDataFrame]:
""" Main knob extracting function. """
ldb = pytimber.LoggingDB(source="nxcals", loglevel=logging.ERROR)
time = _parse_time(opt.time, opt.timedelta)
Expand All @@ -243,39 +240,36 @@ def main(opt) -> Optional[KnobsDict]:
return None

knobs_dict = _parse_knobs_defintions(opt.knob_definitions)
knobs_extract = _extract(ldb, knobs_dict, opt.knobs, time)
knobs_extract = _extract_and_gather(ldb, knobs_dict, opt.knobs, time)
if opt.output:
_write_knobsfile(opt.output, knobs_extract, time)
_write_knobsfile(opt.output, knobs_extract)
return knobs_extract


def _extract(ldb, knobs_dict: KnobsDict, knob_categories: Sequence[str], time: datetime) -> KnobsDict:
def extract(ldb, knobs: Sequence[str], time: datetime) -> Dict[str, float]:
"""
Main function to gather data from the state-tracker.
Standalone function to gather data from the StateTracker.
Extracts data via pytimber's LoggingDB for the knobs given
(either by name or by category) in knobs.
Args:
ldb (pytimber.LoggingDB): The pytimber database.
knobs_dict (KnobsDict): A mapping of all knob-names to KnobEntries.
knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract.
knobs (Sequence[str]): Knob Categories or Knob-Names to extract.
time (datetime): The time, when to extract.
Returns:
Dict[str, KnobsDict]: Contains all the extracted knobs, grouped by categories.
When extraction was not possible, the value attribute of the respective KnobEntry is still None
Dict[str, float]: Contains all the extracted knobs.
When extraction was not possible, the value is None.
"""
LOGGER.info(f"---- EXTRACTING KNOBS @ {time} ----")
knobs = {}
knobs_extracted = {}

for category in knob_categories:
for category in knobs:
for knob in KNOB_CATEGORIES.get(category, [category]):
try:
knobs[knob] = knobs_dict[knob]
except KeyError as e:
raise KeyError(f"Knob '{knob}' not found in the knob-definitions!") from e

# LOGGER.debug(f"Looking for {knob:<34s} ") # pytimber logs this to info anyway
knobkey = f"LhcStateTracker:{knob}:target"
knobs_extracted[knob] = None # to log that this was tried to be extracted.

knobvalue = ldb.get(knobkey, time.timestamp()) # use timestamp to preserve timezone info
if knobkey not in knobvalue:
LOGGER.warning(f"No value for {knob} found")
Expand All @@ -292,38 +286,96 @@ def _extract(ldb, knobs_dict: KnobsDict, knob_categories: Sequence[str], time: d
continue

LOGGER.info(f"Knob value for {knob} extracted: {value} (unscaled)")
knobs[knob].value = value
knobs_extracted[knob] = value

return knobs_extracted


def check_for_undefined_knobs(knobs_definitions: pd.DataFrame, knob_categories: Sequence[str]):
""" Check that all knobs are actually defined in the knobs-definitions.
Args:
knobs_definitions (pd.DataFrame): A mapping of all knob-names to KnobEntries.
knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract.
Raises:
KeyError: If one or more of the knobs don't have a definition.
"""
knob_names = [knob for category in knob_categories for knob in KNOB_CATEGORIES.get(category, [category])]
undefined_knobs = [knob for knob in knob_names if knob not in knobs_definitions.index]
if undefined_knobs:
raise KeyError(
"The following knob(s) could not be found "
f"in the knob-definitions: '{', '.join(undefined_knobs)}'"
)


def _extract_and_gather(ldb, knobs_definitions: pd.DataFrame,
knob_categories: Sequence[str],
time: datetime) -> tfs.TfsDataFrame:
"""
Main function to gather data from the StateTracker and the knob-definitions.
All given knobs (either in categories or as knob names) to be extracted
are checked for being present in the ``knob_definitions``.
A TfsDataFrame is returned, containing the knob-definitions of the
requested knobs and the extracted value (or NAN if not successful).
Args:
ldb (pytimber.LoggingDB): The pytimber database.
knobs_definitions (pd.DataFrame): A mapping of all knob-names to KnobEntries.
knob_categories (Sequence[str]): Knob Categories or Knob-Names to extract.
time (datetime): The time, when to extract.
Returns:
tfs.TfsDataframe: Contains all the extracted knobs, in columns containing
their madx-name, lsa-name, scaling and extracted value.
When extraction was not possible, the value of the respective entry is NAN.
"""
check_for_undefined_knobs(knobs_definitions, knob_categories)
extracted_knobs = extract(ldb, knobs=knob_categories, time=time)

knob_names = list(extracted_knobs.keys())
knobs = tfs.TfsDataFrame(index=knob_names,
columns=[Col.lsa, Col.madx, Col.scaling, Col.value],
headers={Head.time: time})
knobs[[Col.lsa, Col.madx, Col.scaling]] = knobs_definitions.loc[knob_names, :]
knobs[Col.value] = pd.Series(extracted_knobs)
return knobs


def _write_knobsfile(output: Union[Path, str], collected_knobs: KnobsDict, time):
def _write_knobsfile(output: Union[Path, str], collected_knobs: tfs.TfsDataFrame):
""" Takes the collected knobs and writes them out into a text-file. """
collected_knobs = collected_knobs.copy() # to not modify the return dict
collected_knobs = collected_knobs.copy() # to not modify the df

# Sort the knobs by category
category_knobs = {c: {} for c in KNOB_CATEGORIES.keys()}
for category, names in KNOB_CATEGORIES.items():
for name in names:
if name in collected_knobs.keys():
category_knobs[category][name] = collected_knobs.pop(name)
category_knobs["Other Knobs"] = collected_knobs
category_knobs = {}
for category, category_names in KNOB_CATEGORIES.items():
names = [name for name in collected_knobs.index if name in category_names]
if not names:
continue

category_knobs[category] = collected_knobs.loc[names, :]
collected_knobs = collected_knobs.drop(index=names)

if len(collected_knobs): # leftover knobs without category
category_knobs["Other Knobs"] = collected_knobs

# Write them out
with open(output, "w") as outfile:
outfile.write(f"!! --- knobs extracted by knob_extractor\n")
outfile.write(f"!! --- extracted knobs for time {time}\n\n")
for category, knobs in category_knobs.items():
if not knobs:
continue
outfile.write(f"!! --- extracted knobs for time {collected_knobs.headers[Head.time]}\n\n")
for category, knobs_df in category_knobs.items():
outfile.write(f"!! --- {category:10} --------------------\n")
for knob, knob_entry in knobs.items():
outfile.write(f"{knob_entry.get_madx_command()}\n")
for knob, knob_entry in knobs_df.iterrows():
outfile.write(f"{get_madx_command(knob_entry)}\n")
outfile.write("\n")
outfile.write("\n")


# Knobs Dict -------------------------------------------------------------------
# Knobs Definitions ------------------------------------------------------------

def _get_knobs_def_file(user_defined: Optional[Union[Path, str]] = None) -> Path:
""" Check which knobs-definition file is appropriate to take. """
Expand All @@ -343,57 +395,73 @@ def _get_knobs_def_file(user_defined: Optional[Union[Path, str]] = None) -> Path
raise FileNotFoundError("None of the knobs-definition files are available.")


def _load_knobs_dict(file_path: Union[Path, str]) -> KnobsDict:
""" Load the knobs-definition file and convert into KnobsDict.
Each line in this file should consist of four comma separated entries:
madx-name, lsa-name, scaling factor, knob-test value.
def load_knobs_definitions(file_path: Union[Path, str]) -> pd.DataFrame:
""" Load the knobs-definition file and convert into a DataFrame.
Each line in this file should consist of at least three comma separated
entries in the following order: madx-name, lsa-name, scaling factor.
Other columns are ignored.
Alternatively, a TFS-file is also allowed, but needs to have the suffix ``.tfs``.
Args:
file_path (Path): Path to the knobs definition file.
Returns:
Dictionary with LSA names (but with colon instead of /) as
Dataframe with LSA names (but with colon instead of /) as
keys and KnobEntries (without values) as value.
"""
if Path(file_path).suffix == ".tfs":
# just in case someone wants to give tfs files (hidden feature)
df = tfs.read_tfs(file_path)
else:
# parse csv file (the official way)
dtypes = {"madx": str, "lsa": str, "scaling": float, "test": float}
converters = {'madx': str.strip, 'lsa': str.strip} # strip whitespaces
df = pd.read_csv(file_path, comment="#", names=dtypes.keys(), dtype=dtypes, converters=converters)
return _dataframe_to_knobsdict(df)


def _dataframe_to_knobsdict(df: pd.DataFrame) -> KnobsDict:
""" Converts a DataFrame into the required Dictionary structure.
converters = {Col.madx: str.strip, Col.lsa: str.strip} # strip whitespaces
dtypes = {Col.scaling: float}
names = (Col.madx, Col.lsa, Col.scaling)
df = pd.read_csv(file_path,
comment="#",
usecols=list(range(len(names))), # only read the first columns
names=names,
dtype=dtypes,
converters=converters)
return _to_knobs_dataframe(df)


def _to_knobs_dataframe(df: pd.DataFrame) -> pd.DataFrame:
""" Adapts a DataFrame to the conventions used here:
StateTracker variable name as index, all columns lower-case.
Args:
df (pd.DataFrame): DataFrame containing at least the columns
'lsa', 'madx', 'scaling' (upper or lowercase)
Returns:
Dictionary with LSA names (but with colon instead of /) as
keys and KnobEntries (without values) as value.
Dataframe with LSA names (but with colon instead of /) as
keys and 'lsa', 'madx', 'scaling' and (empty) 'value' columns.
"""
df.columns = df.columns.astype(str).str.lower()
df = df[['lsa', 'madx', 'scaling']].set_index("lsa", drop=False)
return {
lsa2name(r[0]): KnobEntry(**r[1].to_dict()) for r in df.iterrows()
}
df = df[[Col.lsa, Col.madx, Col.scaling]].set_index(Col.lsa, drop=False)
df.index = df.index.map(lsa2name)
return df


def _parse_knobs_defintions(knobs_def_input: Optional[Union[Path, str, pd.DataFrame]]) -> KnobsDict:
def _parse_knobs_defintions(knobs_def_input: Optional[Union[Path, str, pd.DataFrame]]) -> pd.DataFrame:
""" Parse the given knob-definitions either from a csv-file or from a DataFrame. """
if isinstance(knobs_def_input, pd.DataFrame):
return _dataframe_to_knobsdict(knobs_def_input)
return _to_knobs_dataframe(knobs_def_input)

# input points to a file or is None
knobs_def_file = _get_knobs_def_file(knobs_def_input)
return _load_knobs_dict(knobs_def_file)
return load_knobs_definitions(knobs_def_file)


def get_madx_command(knob_data: pd.Series) -> str:
if Col.value not in knob_data.index:
raise KeyError("Value entry not found in extracted knob_data. "
"Something went wrong as it should at least be NaN.")
if knob_data[Col.value] is None or pd.isna(knob_data[Col.value]):
return f"! {knob_data[Col.madx]} : No Value extracted"
return f"{knob_data[Col.madx]} := {knob_data[Col.value] * knob_data[Col.scaling]};"


# Time Tools -------------------------------------------------------------------
Expand Down
Loading

0 comments on commit f91837e

Please sign in to comment.