Skip to content

Commit

Permalink
added execution stats to Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Oct 3, 2024
1 parent 02e971d commit 3651ebb
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from data_processing.data_access import DataAccessFactoryBase
from data_processing.transform import TransformStatistics
from data_processing.utils import get_logger
from data_processing.utils import GB, get_logger
from data_processing_spark.runtime.spark import (
SparkTransformFileProcessor,
SparkTransformRuntimeConfiguration,
Expand Down Expand Up @@ -123,6 +123,12 @@ def process_partition(iterator):
try:
# build and save metadata
logger.debug("Building job metadata")
cpus = sc.defaultParallelism
executors = sc._jsc.sc().getExecutorMemoryStatus()
memory = 0.0
for i in range(executors.size()):
memory += executors.toList().apply(i)._2()._1()
resources = {"cpus": cpus, "gpus": 0, "memory": round(memory/GB, 2), "object_store": 0}
input_params = runtime_config.get_transform_metadata() | execution_configuration.get_input_params()
metadata = {
"pipeline": execution_configuration.pipeline_id,
Expand All @@ -136,8 +142,8 @@ def process_partition(iterator):
"job_input_params": input_params | data_access_factory.get_input_params(),
"execution_stats": {
"num partitions": num_partitions,
"execution time, min": (time.time() - start_time) / 60,
},
"execution time, min": round((time.time() - start_time) / 60, 3),
} | resources,
"job_output_stats": stats,
}
logger.debug(f"Saving job metadata: {metadata}.")
Expand Down

0 comments on commit 3651ebb

Please sign in to comment.