From 3651ebb9d4ccf3268f97de381bcecb04fc4a0b0b Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 2 Oct 2024 14:59:05 +0100 Subject: [PATCH] added execution stats to Spark --- .../runtime/spark/transform_orchestrator.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py index 69699f16a..57a6c58fc 100644 --- a/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py +++ b/data-processing-lib/spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py @@ -16,7 +16,7 @@ from data_processing.data_access import DataAccessFactoryBase from data_processing.transform import TransformStatistics -from data_processing.utils import get_logger +from data_processing.utils import GB, get_logger from data_processing_spark.runtime.spark import ( SparkTransformFileProcessor, SparkTransformRuntimeConfiguration, @@ -123,6 +123,12 @@ def process_partition(iterator): try: # build and save metadata logger.debug("Building job metadata") + cpus = sc.defaultParallelism + executors = sc._jsc.sc().getExecutorMemoryStatus() + memory = 0.0 + for i in range(executors.size()): + memory += executors.toList().apply(i)._2()._1() + resources = {"cpus": cpus, "gpus": 0, "memory": round(memory/GB, 2), "object_store": 0} input_params = runtime_config.get_transform_metadata() | execution_configuration.get_input_params() metadata = { "pipeline": execution_configuration.pipeline_id, @@ -136,8 +142,8 @@ def process_partition(iterator): "job_input_params": input_params | data_access_factory.get_input_params(), "execution_stats": { "num partitions": num_partitions, - "execution time, min": (time.time() - start_time) / 60, - }, + "execution time, min": round((time.time() - start_time) / 60, 3), + } | resources, "job_output_stats": stats, } logger.debug(f"Saving job metadata: {metadata}.")