Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trial running the rechunking tool on Beam's Dask Runner #31

Open
mattjbr123 opened this issue Nov 7, 2024 · 9 comments
Open

Trial running the rechunking tool on Beam's Dask Runner #31

mattjbr123 opened this issue Nov 7, 2024 · 9 comments
Assignees

Comments

@mattjbr123
Copy link
Collaborator

mattjbr123 commented Nov 7, 2024

https://beam.apache.org/releases/pydoc/current/apache_beam.runners.dask.dask_runner.html

It's relatively new and buggy so might prove more hassle than it's worth, but we can give it a try and move to other runners (probably Spark) if needed.

@metazool
Copy link

metazool commented Dec 5, 2024

Can I ask a bit about where you're running Beam?

Is this on JASMIN, or is it AWS-based? Is there somewhere we can read through the configuration?

cc @albags

(The context for asking was talking through the Argo Workflows setup that's being used with https://github.com/NERC-CEH/dri-ingestion - possible that Beam would be a good fit for image processing pipelines, and what's needed to ensure portability between one and the other?)

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 5, 2024

Originally the idea is to trial it on JASMIN, specifically LOTUS, their HPC-like cluster, which has dask-gateway for ease of spinning up dask clusters

Is there somewhere we can read through the configuration?

There will be once I've actually had a go at it!

@mattjbr123
Copy link
Collaborator Author

I realise a 'getting started' blurb and some more info might be helpful.
There are minimal instructions in the README for creating the necessary environment and running the script that uses Beam. Currently doesn't cover needing access to the data itself, which is on a 'group_workspace' (read 'access controlled disk space') on JASMIN (which I can happilly grant you access to if you go down that route).
So you might not be able to actually run the script yet but maybe looking at it is still somewhat helpful.

There is limited configuration needed for Beam itself at this stage of our development - it largely figures stuff out for itself with the pipeline you've given it to run. The only things we've tweaked so far is 'num_workers' when running in parallel, the 'direct_running_mode' to tell it to run in serial or parallel, and 'auto_unique_labels' which allows multiple steps in the pipeline of the same name (e.g. Print, useful for debugging).
So far we have run it 'locally', which in this case means on a JASMIN VM or PM (physical machine), and on LOTUS, JASMIN's HPC-like cluster, both using the 'Direct Runner' which is the built-in no-frills generic 'Runner' which means Beam can run anywhere.

The next step, covered by this here issue, is to move away from the limitations of the Direct Runner and use another, the first to try being the Dask Runner. What this does is allow a Beam pipeline to be translated into Dask parlance/objects/task-graphs etc. and run it wherever you have a dask cluster, such as the dask gateway that JASMIN maintains. This will probably require some more configuration, and is what I'm currently working on.

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 13, 2024

Have successfully configured the dask gateway on JASMIN and run a trial job, using the below code snippet:

import dask_gateway

gw = dask_gateway.Gateway("https://dask-gateway.jasmin.ac.uk")

options = gw.cluster_options()

# number of cores for each worker
options.worker_cores = 4

# to ensure the client, scheduler and workers all use the same python environment
options.worker_setup = "source /home/users/mattjbr/miniconda3/bin/activate /home/users/mattjbr/miniconda3/envs/daskbeam"

cluster = gw.new_cluster(options, shutdown_on_close=False)

cluster.adapt(minimum=1, maximum=4)

client = cluster.get_client()

client.status
running'

client
<Client: 'tls://172.17.11.136:39389' processes=1 threads=2, memory=8.00 GiB>

and with a config file containing an API token to allow access to the gateway in my home directory at ~/config/dask/gateway.yaml
(how we manage this 'secret' for the conversion tool we are developing needs some thought)

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 13, 2024

Xarray then would pick up this client by default, without any further config, whereas beam will require the client URL to be provided.
Next commit has added a copy of the script adapted for using dask & beam, or at least how I think it should work!

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 23, 2024

I have managed to get the pipeline/recipe to start running on the DaskRunner (more details on how below), but it is running into memory problems early on that were not present when using the DirectRunner in parallel. I'm not sure if this is more to do with the newness and lacking of some features of the DaskRunner or whether this is expected behaviour when using Dask on a workflow like this. I have asked on the pangeo discourse for advice! https://discourse.pangeo.io/t/daskrunner-and-pangeo-forge-recipes/4776

JASMIN's dask gateway has a memory limit of 20GB per worker which is not enough to resolve the issue (though when running with Beam's DirectRunner each worker never uses more than ~1GB of memory, so I feel like it should be enough...) One easy thing I can try is to throw more memory at the problem, but running with DaskRunner on a local or SLURM cluster instead to avoid the low memory limit of the gateway.

I had to make changes to the the DaskRunner code, essentially moving the config/setup stage of the dask cluster inside the DaskRunner code so that it would more easily work with the gateway cluster. This necessesitated some further changes to the DaskRunner code to enable me to pass in the dask config options to it. These changes I would look to formalise/tidy-up and merge into the Beam library via a PR if it's a setup that ends up working well.
I've included a copy of this in the next commit to keep track of it.

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Dec 23, 2024

Seems to have the same issue running on SLURM, bypassing dask-gateway, even when the workers have 50GB mem each. It does get as far as writing out some of the dataset to disk, but ultimately there is a memory problem here.

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Jan 10, 2025

Some helpful info from @alxmrs : https://discourse.pangeo.io/t/daskrunner-and-pangeo-forge-recipes/4776/3
Appears that the dask graph that is created from the beam pipeline isn't yet optimized, which might be causing the issue.

We are going to try and diagnose the issue and see how quickly solvable it is!

@mattjbr123
Copy link
Collaborator Author

Meanwhile I will have a go at using the SparkRunner, and see if I get anywhere with that... #27

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

When branches are created from issues, their pull requests are automatically linked.

2 participants