crew_class_controller {crew}R Documentation

Controller class

Description

R6 class for controllers.

Details

See crew_controller().

Active bindings

client

Router object.

launcher

Launcher object.

tasks

A list of mirai::mirai() task objects.

pushed

Number of tasks pushed since the controller was started.

popped

Number of tasks popped since the controller was started.

log

Tibble with per-worker metadata about tasks.

error

Tibble of task results (with one result per row) from the last call to ⁠map(error = "stop)⁠.

backlog

Character vector of explicitly backlogged tasks.

autoscaling

TRUE or FALSE, whether async later-based auto-scaling is currently running

Methods

Public methods


Method new()

mirai controller constructor.

Usage
crew_class_controller$new(client = NULL, launcher = NULL)
Arguments
client

Router object. See crew_controller().

launcher

Launcher object. See crew_controller().

Returns

An R6 controller object.

Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}

Method validate()

Validate the client.

Usage
crew_class_controller$validate()
Returns

NULL (invisibly).


Method empty()

Check if the controller is empty.

Usage
crew_class_controller$empty(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is empty if it has no running tasks or completed tasks waiting to be retrieved with push().

Returns

TRUE if the controller is empty, FALSE otherwise.


Method nonempty()

Check if the controller is nonempty.

Usage
crew_class_controller$nonempty(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is empty if it has no running tasks or completed tasks waiting to be retrieved with push().

Returns

TRUE if the controller is empty, FALSE otherwise.


Method resolved()

Number of resolved mirai() tasks.

Usage
crew_class_controller$resolved(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

resolved() is cumulative: it counts all the resolved tasks over the entire lifetime of the controller session.

Returns

Non-negative integer of length 1, number of resolved mirai() tasks. The return value is 0 if the condition variable does not exist (i.e. if the client is not running).


Method unresolved()

Number of unresolved mirai() tasks.

Usage
crew_class_controller$unresolved(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

Non-negative integer of length 1, number of unresolved mirai() tasks.


Method unpopped()

Number of resolved mirai() tasks available via pop().

Usage
crew_class_controller$unpopped(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

Non-negative integer of length 1, number of resolved mirai() tasks available via pop().


Method saturated()

Check if the controller is saturated.

Usage
crew_class_controller$saturated(
  collect = NULL,
  throttle = NULL,
  controller = NULL
)
Arguments
collect

Deprecated in version 0.5.0.9003 (2023-10-02). Not used.

throttle

Deprecated in version 0.5.0.9003 (2023-10-02). Not used.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is saturated if the number of unresolved tasks is greater than or equal to the maximum number of workers. In other words, in a saturated controller, every available worker has a task. You can still push tasks to a saturated controller, but tools that use crew such as targets may choose not to.

Returns

TRUE if the controller is saturated, FALSE otherwise.


Method start()

Start the controller if it is not already started.

Usage
crew_class_controller$start(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Register the mirai client and register worker websockets with the launcher.

Returns

NULL (invisibly).


Method started()

Check whether the controller is started.

Usage
crew_class_controller$started(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Actually checks whether the client is started.

Returns

TRUE if the controller is started, FALSE otherwise.


Method launch()

Launch one or more workers.

Usage
crew_class_controller$launch(n = 1L, controllers = NULL)
Arguments
n

Number of workers to try to launch. The actual number launched is capped so that no more than "workers" workers running at a given time, where "workers" is an argument of crew_controller(). The actual cap is the "workers" argument minus the number of connected workers minus the number of starting workers. A "connected" worker has an active websocket connection to the mirai client, and "starting" means that the worker was launched at most seconds_start seconds ago, where seconds_start is also an argument of crew_controller().

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).


Method scale()

Auto-scale workers out to meet the demand of tasks.

Usage
crew_class_controller$scale(throttle = TRUE, controllers = NULL)
Arguments
throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

The scale() method re-launches all inactive backlogged workers, then any additional inactive workers needed to accommodate the demand of unresolved tasks. A worker is "backlogged" if it was assigned more tasks than it has completed so far.

Methods push(), pop(), and wait() already invoke scale() if the scale argument is TRUE. For finer control of the number of workers launched, call launch() on the controller with the exact desired number of workers.

Returns

NULL (invisibly).


Method autoscale()

Run worker auto-scaling in a private later loop every controller$client$seconds_interval seconds.

Usage
crew_class_controller$autoscale(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Call controller$descale() to terminate the auto-scaling loop.

Returns

NULL (invisibly).


Method descale()

Terminate the auto-scaling loop started by controller$autoscale().

Usage
crew_class_controller$descale(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).


Method push()

Push a task to the head of the task list.

Usage
crew_class_controller$push(
  command,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_timeout = NULL,
  scale = TRUE,
  throttle = TRUE,
  name = NA_character_,
  save_command = FALSE,
  controller = NULL
)
Arguments
command

Language object with R code to run.

data

Named list of local data objects in the evaluation environment.

globals

Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local().

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

scale

Logical, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. Also see the throttle argument.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

name

Optional name of the task.

save_command

Logical of length 1. If TRUE, the controller deparses the command and returns it with the output on pop(). If FALSE (default), the controller skips this step to increase speed.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

Invisibly return the mirai object of the pushed task. This allows you to interact with the task directly, e.g. to create a promise object with promises::as.promise().


Method walk()

Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.

Usage
crew_class_controller$walk(
  command,
  iterate,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_timeout = NULL,
  names = NULL,
  save_command = FALSE,
  scale = TRUE,
  throttle = TRUE,
  controller = NULL
)
Arguments
command

Language object with R code to run.

iterate

Named list of vectors or lists to iterate over. For example, to run function calls f(x = 1, y = "a") and f(x = 2, y = "b"), set command to f(x, y), and set iterate to list(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated as f(x = iterate$x[[1]], y = iterate$y[[1]]) and f(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements of iterate must have the same length. If there are any name conflicts between iterate and data, iterate takes precedence.

data

Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.

globals

Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

names

Optional character of length 1, name of the element of iterate with names for the tasks. If names is supplied, then iterate[[names]] must be a character vector.

save_command

Logical of length 1, whether to store a text string version of the R command in the output.

scale

Logical, whether to automatically scale workers to meet demand. See also the throttle argument.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

In contrast to walk(), map() blocks the local R session and waits for all tasks to complete.

Returns

Invisibly returns a list of mirai task objects for the newly created tasks. The order of tasks in the list matches the order of data in the iterate argument.


Method map()

Apply a single command to multiple inputs, wait for all tasks to complete, and return the results of all tasks.

Usage
crew_class_controller$map(
  command,
  iterate,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_interval = 0.5,
  seconds_timeout = NULL,
  names = NULL,
  save_command = FALSE,
  error = "stop",
  warnings = TRUE,
  verbose = interactive(),
  scale = TRUE,
  throttle = TRUE,
  controller = NULL
)
Arguments
command

Language object with R code to run.

iterate

Named list of vectors or lists to iterate over. For example, to run function calls f(x = 1, y = "a") and f(x = 2, y = "b"), set command to f(x, y), and set iterate to list(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated as f(x = iterate$x[[1]], y = iterate$y[[1]]) and f(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements of iterate must have the same length. If there are any name conflicts between iterate and data, iterate takes precedence.

data

Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.

globals

Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_interval

Number of seconds to wait between auto-scaling operations while waiting for tasks to complete.

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

names

Optional character of length 1, name of the element of iterate with names for the tasks. If names is supplied, then iterate[[names]] must be a character vector.

save_command

Logical of length 1, whether to store a text string version of the R command in the output.

error

Character of length 1, choice of action if a task has an error. Possible values:

  • "stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last errored map() are in the error field of the controller, e.g. controller_object$error. To reduce memory consumption, set controller_object$error <- NULL after you are finished troubleshooting.

  • "warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated.

  • "silent": do nothing special.

warnings

Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.

verbose

Logical of length 1, whether to print progress messages.

scale

Logical, whether to automatically scale workers to meet demand. See also the throttle argument.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

map() cannot be used unless all prior tasks are completed and popped. You may need to wait and then pop them manually. Alternatively, you can start over: either call terminate() on the current controller object to reset it, or create a new controller object entirely.

Returns

A tibble of results and metadata: one row per task and columns corresponding to the output of pop().


Method pop()

Pop a completed task from the results data frame.

Usage
crew_class_controller$pop(
  scale = TRUE,
  collect = NULL,
  throttle = TRUE,
  error = NULL,
  controllers = NULL
)
Arguments
scale

Logical of length 1, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. Scaling up on pop() may be important for transient or nearly transient workers that tend to drop off quickly after doing little work. See also the throttle argument.

collect

Deprecated in version 0.5.0.9003 (2023-10-02).

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

error

NULL or character of length 1, choice of action if the popped task threw an error. Possible values:

  • "stop": throw an error in the main R session instead of returning a value.

  • "warn": throw a warning.

  • NULL or "silent": do not react to errors.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

If not task is currently completed, pop() will attempt to auto-scale workers as needed.

Returns

If there is no task to collect, return NULL. Otherwise, return a one-row tibble with the following columns.


Method collect()

Pop all available task results and return them in a tidy tibble.

Usage
crew_class_controller$collect(
  scale = TRUE,
  throttle = TRUE,
  error = NULL,
  controllers = NULL
)
Arguments
scale

Logical of length 1, whether to automatically call scale() to auto-scale workers to meet the demand of the task load.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

error

NULL or character of length 1, choice of action if the popped task threw an error. Possible values:

  • "stop": throw an error in the main R session instead of returning a value.

  • "warn": throw a warning.

  • NULL or "silent": do not react to errors.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

A tibble of results and metadata of all resolved tasks, with one row per task. Returns NULL if there are no tasks to collect.


Method promise()

Create a promises::promise() object to asynchronously pop or collect one or more tasks.

Usage
crew_class_controller$promise(
  mode = "one",
  seconds_interval = 0.1,
  scale = NULL,
  throttle = NULL,
  controllers = NULL
)
Arguments
mode

Character of length 1, what kind of promise to create. mode must be "one" or "all". Details:

  • If mode is "one", then the promise is fulfilled (or rejected) when at least one task is resolved and available to pop(). When that happens, pop() runs asynchronously, pops a result off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result of pop() (a one-row tibble with the result and metadata). If the task threw an error, the error message of the task is forwarded to any error callbacks registered with the promise.

  • If mode is "all", then the promise is fulfilled (or rejected) when there are no unresolved tasks left in the controller. (Be careful: this condition is trivially met in the moment if the controller is empty and you have not submitted any tasks, so it is best to create this kind of promise only after you submit tasks.) When there are no unresolved tasks left, collect() runs asynchronously, pops all available results off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result of collect() (a tibble with one row per task result). If any of the tasks threw an error, then the first error message detected is forwarded to any error callbacks registered with the promise.

seconds_interval

Positive numeric of length 1, delay in the later::later() polling interval to asynchronously check if the promise can be resolved.

scale

Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now, promise() always turns on auto-scaling in a private later loop (if not already activated).

throttle

Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now, promise() always turns on auto-scaling in a private later loop (if not already activated).

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Please be aware that pop() or collect() will happen asynchronously at a some unpredictable time after the promise object is created, even if your local R process appears to be doing something completely different. This behavior is highly desirable in a Shiny reactive context, but please be careful as it may be surprising in other situations.

Returns

A promises::promise() object whose eventual value will be a tibble with results from one or more popped tasks. If mode = "one", only one task is popped and returned (one row). If mode = "all", then all the tasks are returned in a tibble with one row per task (or NULL is returned if there are no tasks to pop).


Method wait()

Wait for tasks.

Usage
crew_class_controller$wait(
  mode = "all",
  seconds_interval = 0.5,
  seconds_timeout = Inf,
  scale = TRUE,
  throttle = TRUE,
  controllers = NULL
)
Arguments
mode

Character of length 1: "all" to wait for all tasks to complete, "one" to wait for a single task to complete.

seconds_interval

Number of seconds to interrupt the wait in order to scale up workers as needed.

seconds_timeout

Timeout length in seconds waiting for tasks.

scale

Logical, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. See also the throttle argument.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

The wait() method blocks the calling R session and repeatedly auto-scales workers for tasks that need them. The function runs until it either times out or the condition in mode is met.

Returns

A logical of length 1, invisibly. TRUE if the condition in mode was met, FALSE otherwise.


Method push_backlog()

Push the name of a task to the backlog.

Usage
crew_class_controller$push_backlog(name, controller = NULL)
Arguments
name

Character of length 1 with the task name to push to the backlog.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

pop_backlog() pops the tasks that can be pushed without saturating the controller.

Returns

NULL (invisibly).


Method pop_backlog()

Pop the task names from the head of the backlog which can be pushed without saturating the controller.

Usage
crew_class_controller$pop_backlog(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

Character vector of task names which can be pushed to the controller without saturating it. If the controller is saturated, character(0L) is returned.


Method summary()

Summarize the workers and tasks of the controller.

Usage
crew_class_controller$summary(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

A data frame of summary statistics on the workers and tasks. It has one row per worker websocket and the following columns:


Method terminate()

Terminate the workers and the mirai client.

Usage
crew_class_controller$terminate(controllers = NULL)
Arguments
controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).

See Also

Other controller: crew_controller()

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}

## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}

[Package crew version 0.9.5 Index]