Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate options for running on a spark cluster #27

Open
dolegi opened this issue Oct 30, 2024 · 3 comments
Open

Investigate options for running on a spark cluster #27

dolegi opened this issue Oct 30, 2024 · 3 comments
Assignees

Comments

@dolegi
Copy link
Collaborator

dolegi commented Oct 30, 2024

there are some problems using the directrunner on jasmin (crashes on http errors).
We need a better way to run the scripts, probably a spark cluster would be most stable, using the beam to spark runner.

Investigate how we can do this, does jasmin have an existing spark cluster, is there one available to CEH somewhere (Iain might know/be able to point in a direction).

Another option is spinning up our own spark cluster on jasmin, investigate if possible/reasonable.

acceptance criteria

  • We know how to move forward with running the scripts on a spark cluster
@mattjbr123
Copy link
Collaborator

UKCEH's datalabs has the capability of using Spark clusters. I will try there with a minimal version of the pipeline first, to see if it is at least viable in general

@mattjbr123
Copy link
Collaborator

mattjbr123 commented Jan 20, 2025

This is proving difficult.

TL;DR:
Firstly, Beam's integration with Spark is not as user-friendly and simple as it could be, and secondly, I'm suspecting there might be issues with the Spark implentation on DataLabs that are out of my control to solve.

Full:
In theory, the Beam docs for the Spark runner should tell you all you need to know, but is a little too brief and misses out useful bits on information. It appears that to integrate with Spark, a Beam-specific 'JobService' instance needs to be started. There appear to be at least three ways of doing this.

First is as the Beam docs (see the "Deploying Spark with your application" section), start the service using Docker, an image is already available for this, or run the service from the source code. What they don't say about the latter is that you can only access the necessary 'gradlew' file from directly from the github repository. Releases zip files on GitHub and their equivalents in conda/pypi do not include the file.
Given Docker is not available on DataLabs, the latter option is what we'd need to use. This does run fine, however when the Beam Pipeline is submitted to it a Java error is raised.

2025-01-16 11:29:02 ERROR    java.lang.ExceptionInInitializerError: Exception java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in unnamed module @0x1ee8f3b8) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x1ee8f3b8 [in thread "spark-runner-job-invoker-0"]
2025-01-16 11:29:02 INFO     Job state changed to FAILED

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[5], line 7
      1 logging.info('Executing pipeline...')
      2 beam_options = PipelineOptions([
      3     "--runner=PortableRunner",
      4     "--job_endpoint=localhost:8099",
      5     "--environment_type=LOOPBACK"
      6 ])
----> 7 with beam.Pipeline(options=beam_options) as p:
      8    p | recipe

File /data/conda/gear/lib/python3.10/site-packages/apache_beam/pipeline.py:621, in Pipeline.__exit__(self, exc_type, exc_val, exc_tb)
    619   if not exc_type:
    620     self.result = self.run()
--> 621     self.result.wait_until_finish()
    622 finally:
    623   self._extra_context.__exit__(exc_type, exc_val, exc_tb)

File /data/conda/gear/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py:568, in PipelineResult.wait_until_finish(self, duration)
    565   self._observe_state(message_thread)
    567 if self._runtime_exception:
--> 568   raise self._runtime_exception
    570 return self._state

RuntimeError: Pipeline BeamApp-datalab-0116112901-a80e8767_fad54e9e-92ae-4bd7-ae7f-74473a97567b failed in state FAILED: java.lang.ExceptionInInitializerError: Exception java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in unnamed module @0x1ee8f3b8) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x1ee8f3b8 [in thread "spark-runner-job-invoker-0"]

This is apparently workaroundable but needs some extra arguments passed in to a particular part of the program which I don't know enough to be able to do, or is something to do with incompatibilities between Beam, Java and Spark (and https://expertbeacon.com/solving-the-cannot-access-class-sun-nio-ch-directbuffer-error/). For example Beam claims it offers support for the 2.3.x version of Spark, but that on DataLabs is 2.5.x..., with the Java version in /usr/bin/java as 17.0.12.
Maybe a solution for the how to pass in the options here?

The second way of running it using Beam by compiling the pipeline as an 'UberJar' which is something vaguely similar to a container but specific to Spark, and which contains the Beam JobServer in it (I think...).

python -m convert_GEAR_beam --runner=SparkRunner --spark_submit_uber_jar --spark_master_url=spark://spark-scheduler-gearconverter:7077 --environment_type=LOOPBACK

  File "/data/conda/gear/lib/python3.10/site-packages/apache_beam/runners/portability/spark_runner.py", line 59, in default_job_server
    raise ValueError('Option spark_rest_url must be set.')
ValueError: Option spark_rest_url must be set.

However, this fails too as the Spark Master (the scheduler, iow) needs to have a REST API available to use, for the Pipeline 'job' to be submitted to it this way, and from what I could tell, this isn't available on the DataLabs Spark implementation.

The third way of running it is similar. The Pipeline is compiled to a Jar as before, but not an UberJar, and then is submitted to the Spark Master using the spark-submit executable.

python -m convert_GEAR_beam --runner=SparkRunner --output_executable_path=convert_GEAR_beam.jar

spark-submit --class org.apache.beam.runners.spark.SparkPipelineRunner --master spark://spark-scheduler-sparktest2/10.43.18.7:7077 convert_GEAR_beam.jar

25/01/20 17:18:25 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-scheduler-sparktest2:7077...
25/01/20 17:18:45 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.

This again compiles fine but runs into a problem where for some reason a connection to the Spark Master cannot be established. This is the same error I get if I try to run a simple Spark example via a Jupyter notebook, leading me to believe this is something to do with the DataLabs implementation of Spark and not my code.

import os
import pyspark

conf = pyspark.SparkConf()
# The below option can be altered depending on your memory requirement.
# The maximum value is the amount of memory assigned to each worker, minus 1GB.
conf.set('spark.executor.memory', '3g')

os.environ["PYSPARK_PYTHON"] = "/data/conda/gear/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/data/conda/gear/bin/python"

sc = pyspark.SparkContext(master="spark://spark-scheduler-sparktest2:7077", conf=conf)

sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[2], line 1
----> 1 sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

File /usr/local/spark/python/pyspark/context.py:824, in SparkContext.parallelize(self, c, numSlices)
    821     assert self._jvm is not None
    822     return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
--> 824 jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
    825 return RDD(jrdd, self, serializer)

File /usr/local/spark/python/pyspark/context.py:870, in SparkContext._serialize_to_jvm(self, data, serializer, reader_func, server_func)
    868     finally:
    869         tempFile.close()
--> 870     return reader_func(tempFile.name)
    871 finally:
    872     # we eagerly reads the file so we can delete right after.
    873     os.unlink(tempFile.name)

File /usr/local/spark/python/pyspark/context.py:818, in SparkContext.parallelize.<locals>.reader_func(temp_filename)
    816 def reader_func(temp_filename: str) -> JavaObject:
    817     assert self._jvm is not None
--> 818     return self._jvm.PythonRDD.readRDDFromFile(self._jsc, temp_filename, numSlices)

File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}[.\n](https://datalab.datalabs.ceh.ac.uk/resource/fdri/sparktrialnb/lab/tree/RTC%3Adri_gridded_data/scripts/GEAR/n)".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

@mattjbr123
Copy link
Collaborator

Next thing to try might be this? https://github.com/moradology/beam-pyspark-runner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

When branches are created from issues, their pull requests are automatically linked.

2 participants