Skip to content

Commit

Permalink
Metric importer refactor (#1035)
Browse files Browse the repository at this point in the history
* Added resolution AVG to the measurements table

* (feature): Added modular importer for metrics instead of inline in runner

* Added more measurement test data

* (tests): For phase stats and metrics importer

* Rework metric provider processing mechanism and removed storing to DB / run_id

run_id is now added in segmented helper metric_importer. Run ID is a concept the metric providers should not be aware of and
is thus handled now externally.
Furthermore the processing mechanism in the providers does now not allow overloading of externally callable functions
in read_metrics(). New private sub functions are created that go through different steps of reading, parsing, validation and adding data.

* Added tests

* (improvement): Added all powermetrics metrics to phase_stats to remove warnings

* (improvement): Close StringIO buffer to prevent leaks if long runtime

* (consistency):containers is always required for function

* (improvement): _check_empty introduced to have no magic assumptions about if provider can or cannot return empty rows

* (style): Typos and indents

* (fix): Tests where missing assert

* (improvement): Reading twice reduced to one in test helper

* (Tests): Fix Tests

* (fix): Duplicate name in phase_stats without functionality

* (style): Namings and typos

* Changed powermetrics file to non time underflow

* (fix): Removed email column from tests also
  • Loading branch information
ArneTR authored Dec 28, 2024
1 parent 1d7a43f commit 6cef797
Show file tree
Hide file tree
Showing 47 changed files with 246,353 additions and 327 deletions.
3 changes: 3 additions & 0 deletions docker/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ CREATE TABLE measurements (
detail_name text NOT NULL,
metric text NOT NULL,
value bigint NOT NULL,
resolution_avg DOUBLE PRECISION NOT NULL,
resolution_max DOUBLE PRECISION NOT NULL,
resolution_95p DOUBLE PRECISION NOT NULL,
unit text NOT NULL,
time bigint NOT NULL,
created_at timestamp with time zone DEFAULT now(),
Expand Down
57 changes: 57 additions & 0 deletions lib/metric_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from io import StringIO

from lib.db import DB
from metric_providers.network.connections.tcpdump.system.provider import generate_stats_string

def import_measurements_new(df, metric_name, run_id):

df['measurement_metric_id'] = None # prepare
detail_names = df[['detail_name', 'unit']].drop_duplicates()

for _, row in detail_names.iterrows():
measurement_metric_id = DB().fetch_one('''
INSERT INTO measurement_metrics (run_id, metric, detail_name, unit)
VALUES (%s, %s, %s, %s)
RETURNING id
''', params=(run_id, metric_name, row['detail_name'], row['unit']))[0]
df.loc[(df['detail_name'] == row['detail_name']) & (df['unit'] == row['unit']), 'measurement_metric_id'] = measurement_metric_id

f = StringIO(df[['measurement_metric_id', 'value', 'time']]
.to_csv(index=False, header=False))
DB().copy_from(file=f, table='measurement_values', columns=['measurement_metric_id', 'value', 'time'], sep=',')
f.close()

def import_measurements(df, metric_name, run_id, containers=None):

if metric_name == 'network_connections_proxy_container_dockerproxy':

df['run_id'] = run_id
f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='network_intercepts', columns=df.columns, sep=',')
f.close()

elif metric_name == 'network_connections_tcpdump_system':
DB().query("""
UPDATE runs
SET logs= COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(generate_stats_string(df), run_id))

else:

if 'container_id' in df.columns:
df = map_container_id_to_detail_name(df, containers)

df['run_id'] = run_id

f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='measurements', columns=df.columns, sep=',')
f.close()

def map_container_id_to_detail_name(df, containers):
df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
25 changes: 11 additions & 14 deletions lib/phase_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from lib.global_config import GlobalConfig
from lib.db import DB
from lib import utils
from lib import error_helpers

def generate_csv_line(run_id, metric, detail_name, phase_name, value, value_type, max_value, min_value, unit):
Expand All @@ -23,7 +22,7 @@ def build_and_store_phase_stats(run_id, sci=None):
sci = {}

query = """
SELECT metric, unit, detail_name
SELECT metric, unit, detail_name, AVG(resolution_avg)
FROM measurements
WHERE run_id = %s
GROUP BY metric, unit, detail_name
Expand All @@ -37,17 +36,17 @@ def build_and_store_phase_stats(run_id, sci=None):


query = """
SELECT phases, measurement_config
SELECT phases
FROM runs
WHERE id = %s
"""
data = DB().fetch_one(query, (run_id, ))
phases = DB().fetch_one(query, (run_id, ))

if not data or not data[0] or not data[1]:
if not phases or not phases[0]:
error_helpers.log_error('Phases object was empty and no phase_stats could be created. This can happen for failed runs, but should be very rare ...', run_id=run_id)
return

phases, measurement_config = data # unpack
phases = phases[0]

csv_buffer = StringIO()

Expand Down Expand Up @@ -75,7 +74,7 @@ def build_and_store_phase_stats(run_id, sci=None):
csv_buffer.write(generate_csv_line(run_id, 'phase_time_syscall_system', '[SYSTEM]', f"{idx:03}_{phase['name']}", duration, 'TOTAL', None, None, 'us'))

# now we go through all metrics in the run and aggregate them
for (metric, unit, detail_name) in metrics: # unpack
for metric, unit, detail_name, resolution_avg in metrics: # unpack
# -- saved for future if I need lag time query
# WITH times as (
# SELECT id, value, time, (time - LAG(time) OVER (ORDER BY detail_name ASC, time ASC)) AS diff, unit
Expand All @@ -84,9 +83,6 @@ def build_and_store_phase_stats(run_id, sci=None):
# ORDER BY detail_name ASC, time ASC
# ) -- Backlog: if we need derivatives / integrations in the future

provider_name = metric.replace('_', '.') + '.provider.' + utils.get_pascal_case(metric) + 'Provider'
provider_resolution_in_ms = measurement_config['providers'][provider_name]['resolution']

results = DB().fetch_one(select_query,
(run_id, metric, detail_name, phase['start'], phase['end'], ))

Expand Down Expand Up @@ -121,10 +117,10 @@ def build_and_store_phase_stats(run_id, sci=None):
if metric in ('cpu_utilization_cgroup_container', ):
cpu_utilization_containers[detail_name] = avg_value

elif metric in ['network_io_cgroup_container', 'network_io_procfs_system', 'disk_io_procfs_system', 'disk_io_cgroup_container']:
elif metric in ['network_io_cgroup_container', 'network_io_procfs_system', 'disk_io_procfs_system', 'disk_io_cgroup_container', 'disk_io_bytesread_powermetrics_vm', 'disk_io_byteswritten_powermetrics_vm']:
# I/O values should be per second. However we have very different timing intervals.
# So we do not directly use the average here, as this would be the average per sampling frequency. We go through the duration
provider_conversion_factor_to_s = decimal.Decimal(provider_resolution_in_ms/1_000)
provider_conversion_factor_to_s = decimal.Decimal(resolution_avg/1_000_000)
csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", avg_value/provider_conversion_factor_to_s, 'MEAN', max_value/provider_conversion_factor_to_s, min_value/provider_conversion_factor_to_s, f"{unit}/s"))

# we also generate a total line to see how much total data was processed
Expand Down Expand Up @@ -152,8 +148,9 @@ def build_and_store_phase_stats(run_id, sci=None):
machine_energy_phase = value_sum
machine_power_phase = power_avg

else:
error_helpers.log_error('Unmapped phase_stat found, using default', metric=metric, detail_name=detail_name, run_id=run_id)
else: # Default
if metric not in ('cpu_time_powermetrics_vm', ):
error_helpers.log_error('Unmapped phase_stat found, using default', metric=metric, detail_name=detail_name, run_id=run_id)
csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", value_sum, 'TOTAL', max_value, min_value, unit))

# after going through detail metrics, create cumulated ones
Expand Down
59 changes: 51 additions & 8 deletions metric_providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess
from io import StringIO
import pandas
from typing import final

from lib.system_checks import ConfigurationCheckError
from lib import process_helpers
Expand Down Expand Up @@ -97,18 +98,16 @@ def get_stderr(self):
def has_started(self):
return self._has_started

def check_monotonic(self, df):
def _check_monotonic(self, df):
if not df['time'].is_monotonic_increasing:
raise ValueError(f"Time from metric provider {self._metric_name} is not monotonic increasing")

def check_resolution_underflow(self, df):
def _check_resolution_underflow(self, df):
if self._unit in ['mJ', 'uJ', 'Hz', 'us']:
if (df['value'] <= 1).any():
raise ValueError(f"Data from metric provider {self._metric_name} is running into a resolution underflow. Values are <= 1 {self._unit}")



def read_metrics(self, run_id, containers=None): #pylint: disable=unused-argument
def _read_metrics(self): # can be overriden in child
with open(self._filename, 'r', encoding='utf-8') as file:
csv_data = file.read()

Expand All @@ -125,13 +124,57 @@ def read_metrics(self, run_id, containers=None): #pylint: disable=unused-argumen
if df.isna().any().any():
raise ValueError(f"Dataframe for {self._metric_name} contained NA values.")

return df

def _check_empty(self, df):
if df.empty:
raise RuntimeError(f"Metrics provider {self._metric_name} metrics log file was empty.")


def _parse_metrics(self, df): # can be overriden in child
df['detail_name'] = f"[{self._metric_name.split('_')[-1]}]" # default, can be overridden in child
return df

def _add_and_validate_resolution_and_jitter(self, df):
# DF can have many columns still. Since all of them might have induced a separate timing row
# we group by everything apart from time and value itself
# for most metric providers only detail_name and container_id should be present and differ though
excluded_columns = ['time', 'value']
grouping_columms = [col for col in df.columns if col not in excluded_columns]
df['effective_resolution'] = df.groupby(grouping_columms)['time'].diff()
df['resolution_max'] = df.groupby(grouping_columms)['effective_resolution'].transform('max')
df['resolution_avg'] = df.groupby(grouping_columms)['effective_resolution'].transform('mean')
df['resolution_95p'] = df.groupby(grouping_columms)['effective_resolution'].transform(lambda x: x.quantile(0.95))
df = df.drop('effective_resolution', axis=1)

if (resolution_95p := df['resolution_95p'].max()) >= self._resolution*1000*1.2:
raise RuntimeError(f"Resolution 95p was absurdly high: {resolution_95p} compared to base resolution of {self._resolution*1000}", df)

if (resolution_95p := df['resolution_95p'].min()) <= self._resolution*1000*0.8:
raise RuntimeError(f"Resolution 95p was absurdly low: {resolution_95p} compared to base resolution of {self._resolution*1000}", df)


return df

def _add_unit_and_metric(self, df): # can be overriden in child
df['unit'] = self._unit
df['metric'] = self._metric_name
df['run_id'] = run_id
return df

@final
def read_metrics(self): # should not be overriden

df = self._read_metrics() # is not always returning a data frame, but can in rare cases also return a list if no actual numeric measurements are captured

self._check_empty(df)

self._check_monotonic(df) # check must be made before data frame is potentially sorted in _parse_metrics
self._check_resolution_underflow(df)

df = self._parse_metrics(df)
df = self._add_unit_and_metric(df)

self.check_monotonic(df)
self.check_resolution_underflow(df)
df = self._add_and_validate_resolution_and_jitter(df)

return df

Expand Down
6 changes: 1 addition & 5 deletions metric_providers/cpu/energy/rapl/msr/component/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
if not is_rapl_energy_filtering_deactivated():
raise MetricProviderConfigurationError('RAPL energy filtering is active and might skew results!')

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):

df['detail_name'] = df.package_id
df = df.drop('package_id', axis=1)
Expand Down
6 changes: 1 addition & 5 deletions metric_providers/cpu/frequency/sysfs/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):

df['detail_name'] = df.core_id
df = df.drop('core_id', axis=1)
Expand Down
13 changes: 0 additions & 13 deletions metric_providers/cpu/time/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,3 @@ def __init__(self, resolution, skip_check=False):
current_dir=os.path.dirname(os.path.abspath(__file__)),
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
15 changes: 1 addition & 14 deletions metric_providers/cpu/utilization/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,5 @@ def __init__(self, resolution, skip_check=False):
resolution=resolution,
unit='Ratio',
current_dir=os.path.dirname(os.path.abspath(__file__)),
skip_check = skip_check,
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
12 changes: 2 additions & 10 deletions metric_providers/disk/io/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df = super()._parse_metrics(df) # sets detail_name

df = df.sort_values(by=['container_id', 'time'], ascending=True)

Expand All @@ -42,9 +39,4 @@ def read_metrics(self, run_id, containers=None):
df['value'] = df.value.astype(int)
df = df.drop(columns=['read_bytes','written_bytes', 'written_bytes_intervals', 'read_bytes_intervals']) # clean up

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
8 changes: 2 additions & 6 deletions metric_providers/disk/io/procfs/system/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df['detail_name'] = df['device']

df = df.sort_values(by=['device', 'time'], ascending=True)

Expand All @@ -42,7 +39,6 @@ def read_metrics(self, run_id, containers=None):
df['blocksize'] = df['device'].apply(self.get_blocksize)
df['value'] = (df['read_sectors_intervals'] + df['written_sectors_intervals'])*df['blocksize']
df['value'] = df.value.astype(int)
df['detail_name'] = df['device']
df = df.drop(columns=['read_sectors','written_sectors', 'written_sectors_intervals', 'read_sectors_intervals', 'device', 'blocksize']) # clean up

return df
Expand Down
7 changes: 2 additions & 5 deletions metric_providers/gpu/energy/nvidia/smi/component/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ def __init__(self, resolution, skip_check=False):
def check_system(self, check_command="default", check_error_message=None, check_parallel_provider=True):
super().check_system(check_command=['which', 'nvidia-smi'], check_error_message="nvidia-smi is not installed on the system")

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df = super()._parse_metrics(df) # sets detail_name

'''
Conversion to Joules
Expand Down
3 changes: 1 addition & 2 deletions metric_providers/lmsensors/abstract_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
raise MetricProviderConfigurationError(f"{self._metric_name} provider could not be started.\nCannot find feature '{feature}' in the output section for chip starting with '{config_chip}' of the 'sensors' command.\n\nAre you running in a VM / cloud / shared hosting?\nIf so please disable the {self._metric_name} provider in the config.yml")


def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)
def _parse_metrics(self, df):

df['detail_name'] = df.sensor_name
df = df.drop('sensor_name', axis=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
if not is_rapl_energy_filtering_deactivated():
raise MetricProviderConfigurationError('RAPL energy filtering is active and might skew results!')

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):

df['detail_name'] = df.dram_id
df = df.drop('dram_id', axis=1)
Expand Down
Loading

0 comments on commit 6cef797

Please sign in to comment.