-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathcrew.qmd
280 lines (233 loc) · 13.9 KB
/
crew.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
---
execute:
freeze: auto
---
# Distributed computing {#crew}
```{r, message = FALSE, warning = FALSE, echo = FALSE}
knitr::opts_knit$set(root.dir = fs::dir_create(tempfile()))
knitr::opts_chunk$set(collapse = TRUE, comment = "#>", eval = TRUE)
```
```{r, message = FALSE, warning = FALSE, echo = FALSE, eval = TRUE}
library(targets)
library(tarchetypes)
```
:::{.callout-note}
## Package versions
For best results, please use `targets` version `1.2.0` or higher and `crew` version `0.3.0` or higher. If you cannot install packages globally, consider creating a local [`renv`](https://rstudio.github.io/renv/articles/renv.html) package library for your project.
```{r, eval = FALSE}
renv::init()
renv::install("crew")
renv::install("targets")
renv::snapshot()
```
:::
:::{.callout-tip}
## Performance
See the [performance chapter](#performance) for options, settings, and other choices to make parallel and distributed pipelines more efficient.
:::
To efficiently process a large and complex pipeline, [`tar_make()`](https://docs.ropensci.org/targets/reference/tar_make.html) can run multiple targets at the same time. Thanks to integration with [`crew`](https://wlandau.github.io/crew/) and blazing fast scheduling from [`mirai`](https://github.com/shikokuchuo/mirai) behind the scenes, those targets can run on a variety of high-performance computing platforms, and they can scale out to the hundreds and beyond.
## How it works
1. Write your pipeline as usual, but set the `controller` argument of [`tar_option_set`](https://docs.ropensci.org/targets/reference/tar_option_set.html) to the [`crew`](https://wlandau.github.io/crew/) controller of your choice.
2. Run the pipeline with a simple [`tar_make()`](https://docs.ropensci.org/targets/reference/tar_make.html).
The [`crew`](https://wlandau.github.io/crew/) controller from (1) allows [`tar_make()`](https://docs.ropensci.org/targets/reference/tar_make.html) to launch external R processes called "workers" which can each run one or more targets. By delegating long-running targets to these workers, the local R session is free to focus on other tasks, and the pipeline finishes faster.
## Example
The following `_targets.R` file uses a [local process controller](https://wlandau.github.io/crew/reference/crew_controller_local.html) with 2 workers. That means up to 2 workers can be running at any given time, and each worker is an separate R process on the same computer as the local R process.
```{r, echo = FALSE, eval = TRUE}
tar_script({
get_data <- function() TRUE
run_model1 <- function(data) data
run_model2 <- function(data) data
run_model3 <- function(data) data
plot_model <- function(model) model
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
})
```
```{r, eval = FALSE}
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
tar_option_set(
controller = crew_controller_local(workers = 2)
)
tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
```
```{r, eval = TRUE}
# R console
tar_visnetwork(targets_only = TRUE)
```
Run the pipeline with a simple call to [`tar_make()`](https://docs.ropensci.org/targets/reference/tar_make.html). Please note that real-life pipelines will have longer execution times, especially for the models.
```{r, eval = FALSE, echo = TRUE}
# R console
tar_make()
```
```{r, eval = TRUE, echo = FALSE}
tar_make(callr_arguments = list(spinner = FALSE))
```
Let's talk through what happens in the above call to [`tar_make()`](https://docs.ropensci.org/targets/reference/tar_make.html). First, a new worker launches and sends the `data` target to the `crew` queue. After the `data` target completes, all three models are ready to begin. A second worker automatically launches to meet the increased demand of the workload, and each of the two workers starts to run a model. After one of the models finishes, its worker is free to either run the downstream plot or the third model. The process continues until all the targets are complete. The workers shut down when the pipeline is done.
## Configuration and auto-scaling
Adding more workers might speed up your pipeline, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.
As mentioned above, new workers launch automatically in response to increasing demand. By default, they stay running for the duration of the pipeline. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources^[Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of [`crew_controller_local()`](https://wlandau.github.io/crew/reference/crew_controller_local.html) for details.] The most useful arguments for down-scaling, in order of importance, are:
1. `seconds_idle`: automatically shut down a worker if it spends too long waiting for a target.
2. `tasks_max`: maximum number of tasks a worker can run before shutting down.
3. `seconds_wall`: soft wall time of a worker.
On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead.
Fortunately, it is straightforward to explore auto-scaling and configuration issues empirically. Simply run your existing configuration with `tar_make()` and then look at the output from `tar_crew()`. For example, consider the following pipeline with 1001 quick targets, 10 workers, and a maximum idle time of 3 seconds.
```{r, eval = FALSE}
# _targets.R file
library(targets)
library(tarchetypes)
controller <- crew::crew_controller_local(
name = "my_controller",
workers = 10,
seconds_idle = 3
)
tar_option_set(controller = controller)
list(
tar_target(x, seq_len(1000)),
tar_target(y, x, pattern = map(x))
)
```
After running `tar_make()`, `tar_crew()` shows the following worker metadata:
```{r, eval = FALSE}
tar_crew()
#> # A tibble: 10 × 5
#> controller worker launches seconds targets
#> <chr> <int> <int> <dbl> <int>
#> 1 my_controller 1 1 1.77 998
#> 2 my_controller 2 4 0.076 3
#> 3 my_controller 3 1 0 0
#> 4 my_controller 4 0 0 0
#> 5 my_controller 5 0 0 0
#> 6 my_controller 6 0 0 0
#> 7 my_controller 7 0 0 0
#> 8 my_controller 8 0 0 0
#> 9 my_controller 9 0 0 0
#> 10 my_controller 10 0 0 0
```
The first worker did most of the work, and the second worker ran only 3 targets. Both the second and third workers launched and self-terminated more often than they ran targets. In fact, the third worker did not run any targets at all. None of the other workers actually launched. For this pipeline, it would be better to not use `crew` at all, or set `deployment = "main"` in `tar_target()` for the targets that will complete instantly. And if 2 workers were truly busy instead of just 1, it would be reasonable to set `workers = 2` and pick a much higher value of `seconds_idle`
But now, suppose the targets in the pipeline take longer to run:
```{r, eval = FALSE}
# _targets.R file:
library(targets)
library(tarchetypes)
controller <- crew::crew_controller_local(
name = "my_controller",
workers = 10,
seconds_idle = 3
)
tar_option_set(controller = controller)
list(
tar_target(x, seq_len(1000)),
tar_target(y, Sys.sleep(1), pattern = map(x)) # Run for 1 second.
)
```
The `tar_crew()` worker metadata is far more sensible: each worker launched once and accomplished a lot. And because `seconds_idle` was set to 3, we know each worker instance wasted no more than 3 seconds waiting for new targets to run. As long as resources allow, it is appropriate to have `workers = 10` and `seconds_idle = 3` for this pipeline: this configuration speeds up the pipeline while avoiding wasted resources.
```{r, eval = FALSE}
tar_crew()
#> # A tibble: 10 × 5
#> controller worker launches seconds targets
#> <chr> <int> <int> <dbl> <int>
#> 1 my_controller 1 1 103. 104
#> 2 my_controller 2 1 100. 100
#> 3 my_controller 3 1 100. 100
#> 4 my_controller 4 1 100. 100
#> 5 my_controller 5 1 100. 100
#> 6 my_controller 6 1 100. 100
#> 7 my_controller 7 1 99.4 99
#> 8 my_controller 8 1 100. 100
#> 9 my_controller 9 1 99.4 99
#> 10 my_controller 10 1 99.4 99
```
## Backends
[`crew`](https://wlandau.github.io/crew/) is a platform for multiple computing platforms, not just local processes, but also traditional high-performance computing systems and cloud computing services. For example, to run each worker as a job on a [Sun Grid Engine](https://en.wikipedia.org/wiki/Oracle_Grid_Engine) cluster, use [`crew_controller_sge()`](https://wlandau.github.io/crew.cluster/reference/crew_controller_sge.html) from the [`crew.cluster`](https://wlandau.github.io/crew.cluster/) package.
```{r, eval = FALSE}
# _targets.R
library(targets)
library(tarchetypes)
library(crew.cluster)
tar_option_set(
controller = crew_controller_sge(
workers = 3,
script_lines = "module load R",
sge_log_output = "log_folder/"
)
)
tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
```
If [`crew.cluster`](https://wlandau.github.io/crew.cluster/) and other official packages do not meet your needs, then you can write your own launcher plugin tailored to your own specific computing environment. [`crew`](https://wlandau.github.io/crew/) makes this process straightforward, and the vignette at <https://wlandau.github.io/crew/articles/launcher_plugins.html> walks through the details step by step.
## Heterogeneous workers
Different targets may have different computing requirements, from memory to GPUs and beyond. You can send different targets to different kinds of workers using [`crew` controller groups](https://wlandau.github.io/crew/articles/groups.html). In the `_targets.R` file below, we create a local process controller alongside a [Sun Grid Engine](https://en.wikipedia.org/wiki/Oracle_Grid_Engine) controller a memory requirement and a GPU. We combine them in a [`crew` controller group](https://wlandau.github.io/crew/articles/groups.html) which we supply to the `controller` argument of [`tar_option_set`](https://docs.ropensci.org/targets/reference/tar_option_set.html). Next, we use [`tar_resources()`](https://docs.ropensci.org/targets/reference/tar_resources.html) and [`tar_resources_crew()`](https://docs.ropensci.org/targets/reference/tar_resources_crew.html) to tell `model2` to run on [Sun Grid Engine](https://en.wikipedia.org/wiki/Oracle_Grid_Engine) and all other targets to run on local processes. The `deployment = "main"` argument tells the plots to avoid worker processes altogether and run on the main central R process.
```{r, eval = FALSE}
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
library(crew.cluster)
controller_local <- crew_controller_local(
name = "my_local_controller",
workers = 2,
seconds_idle = 10
)
controller_sge <- crew_controller_sge(
name = "my_sge_controller",
workers = 3,
seconds_idle = 15,
script_lines = "module load R",
sge_log_output = "log_folder/",
sge_memory_gigabytes_required = 64,
sge_gpu = 1
)
tar_option_set(
controller = crew_controller_group(controller_local, controller_sge),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_local_controller")
)
)
tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(
name = model2,
command = run_model2(data),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_sge_controller")
)
),
tar_target(name = model3, run_model3(data)),
tar_target(name = plot1, command = plot_model(model1), deployment = "main"),
tar_target(name = plot2, command = plot_model(model2), deployment = "main"),
tar_target(name = plot3, command = plot_model(model3), deployment = "main")
)
```
## Resource usage
The [`autometric`](https://wlandau.github.io/autometric) package can monitor the CPU and memory consumption of the various processes in a `targets` pipeline, both local processes and parallel workers. Please read <https://wlandau.github.io/crew/articles/logging.html> for details and examples.
## Thanks
The [`crew`](https://wlandau.github.io/crew/) package is an extension of [`mirai`](https://github.com/shikokuchuo/mirai), a sleek and sophisticated task scheduler that efficiently processes intense workloads. [`crew`](https://wlandau.github.io/crew/) is only possible because of the amazing work by [Charlie Gao](https://github.com/shikokuchuo) in packages [`mirai`](https://github.com/shikokuchuo/mirai) and [`nanonext`](https://github.com/shikokuchuo/nanonext).