| crew_class_controller_group {crew} | R Documentation |
Controller group class
Description
R6 class for controller groups.
Details
Active bindings
controllersList of
R6controller objects.relayRelay object for event-driven programming on a downstream condition variable.
Methods
Public methods
Method new()
Multi-controller constructor.
Usage
crew_class_controller_group$new(controllers = NULL, relay = NULL)
Arguments
controllersList of
R6controller objects.relayRelay object for event-driven programming on a downstream condition variable.
Returns
An R6 object with the controller group object.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
Method validate()
Validate the client.
Usage
crew_class_controller_group$validate()
Returns
NULL (invisibly).
Method empty()
See if the controllers are empty.
Usage
crew_class_controller_group$empty(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
Returns
TRUE if all the selected controllers are empty,
FALSE otherwise.
Method nonempty()
Check if the controller group is nonempty.
Usage
crew_class_controller_group$nonempty(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
Returns
TRUE if the controller is empty, FALSE otherwise.
Method resolved()
Number of resolved mirai() tasks.
Usage
crew_class_controller_group$resolved(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
resolved() is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Returns
Non-negative integer of length 1,
number of resolved mirai() tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
Method unresolved()
Number of unresolved mirai() tasks.
Usage
crew_class_controller_group$unresolved(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
Non-negative integer of length 1,
number of unresolved mirai() tasks.
Method unpopped()
Number of resolved mirai() tasks available via pop().
Usage
crew_class_controller_group$unpopped(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
Non-negative integer of length 1,
number of resolved mirai() tasks available via pop().
Method saturated()
Check if a controller is saturated.
Usage
crew_class_controller_group$saturated( collect = NULL, throttle = NULL, controller = NULL )
Arguments
collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
controllerCharacter vector of length 1 with the controller name. Set to
NULLto select the default controller thatpush()would choose.
Details
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew such as targets may choose not to.
Returns
TRUE if all the selected controllers are saturated,
FALSE otherwise.
Method start()
Start one or more controllers.
Usage
crew_class_controller_group$start(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
NULL (invisibly).
Method started()
Check whether all the given controllers are started.
Usage
crew_class_controller_group$started(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
Actually checks whether all the given clients are started.
Returns
TRUE if the controllers are started, FALSE if any are not.
Method launch()
Launch one or more workers on one or more controllers.
Usage
crew_class_controller_group$launch(n = 1L, controllers = NULL)
Arguments
nNumber of workers to launch in each controller selected.
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
NULL (invisibly).
Method scale()
Automatically scale up the number of workers if needed in one or more controller objects.
Usage
crew_class_controller_group$scale(throttle = TRUE, controllers = NULL)
Arguments
throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
See the scale() method in individual controller classes.
Returns
NULL (invisibly).
Method autoscale()
Run worker auto-scaling in a private later loop
every controller$client$seconds_interval seconds.
Usage
crew_class_controller_group$autoscale(controllers = NULL)
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL (invisibly).
Method descale()
Terminate the auto-scaling loop started by
controller$autoscale().
Usage
crew_class_controller_group$descale(controllers = NULL)
Arguments
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL (invisibly).
Method push()
Push a task to the head of the task list.
Usage
crew_class_controller_group$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NA_character_, save_command = FALSE, controller = NULL )
Arguments
commandLanguage object with R code to run.
dataNamed list of local data objects in the evaluation environment.
globalsNamed list of objects to temporarily assign to the global environment for the task. See the
reset_globalsargument ofcrew_controller_local().substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).scaleLogical, whether to automatically scale workers to meet demand. See the
scaleargument of thepush()method of ordinary single controllers.throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.nameOptional name of the task. Replaced with a random name if
NULLor in conflict with an existing name in the task list.save_commandLogical of length 1. If
TRUE, the controller deparses the command and returns it with the output onpop(). IfFALSE(default), the controller skips this step to increase speed.controllerCharacter of length 1, name of the controller to submit the task. If
NULL, the controller defaults to the first controller in the list.
Returns
Invisibly return the mirai object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise().
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Usage
crew_class_controller_group$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = FALSE, scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character of length 1, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandLogical of length 1, whether to store a text string version of the R command in the output.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerCharacter of length 1, name of the controller to submit the tasks. If
NULL, the controller defaults to the first controller in the list.
Details
In contrast to walk(), map() blocks the local R session
and waits for all tasks to complete.
Returns
Invisibly returns a list of mirai task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate argument.
Method map()
Apply a single command to multiple inputs.
Usage
crew_class_controller_group$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = 0.5, seconds_timeout = NULL, names = NULL, save_command = FALSE, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")andf(x = 2, y = "b"), setcommandtof(x, y), and setiteratetolist(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])andf(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements ofiteratemust have the same length. If there are any name conflicts betweeniterateanddata,iteratetakes precedence.dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globalsargument ofcrew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.substituteLogical of length 1, whether to call
base::substitute()on the supplied value of thecommandargument. IfTRUE(default) thencommandis quoted literally as you write it, e.g.push(command = your_function_call()). IfFALSE, thencrewassumescommandis a language object and you are passing its value, e.g.push(command = quote(your_function_call())).substitute = TRUEis appropriate for interactive use, whereassubstitute = FALSEis meant for automated R programs that invokecrewcontrollers.seedInteger of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seedargument ofset.seed()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.algorithmInteger of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kindargument ofRNGkind()if notNULL. Ifalgorithmandseedare bothNULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream(). Seevignette("parallel", package = "parallel")for details.packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the
lib.locargument ofrequire().seconds_intervalNumber of seconds to wait between auto-scaling operations while waiting for tasks to complete.
seconds_timeoutOptional task timeout passed to the
.timeoutargument ofmirai::mirai()(after converting to milliseconds).namesOptional character of length 1, name of the element of
iteratewith names for the tasks. Ifnamesis supplied, theniterate[[names]]must be a character vector.save_commandLogical of length 1, whether to store a text string version of the R command in the output.
errorCharacter vector of length 1, choice of action if a task has an error. Possible values:
-
"stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()are in theerrorfield of the controller, e.g.controller_object$error. To reduce memory consumption, setcontroller_object$error <- NULLafter you are finished troubleshooting. -
"warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated. -
"silent": do nothing special.
-
warningsLogical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verboseLogical of length 1, whether to print progress messages.
scaleLogical, whether to automatically scale workers to meet demand. See also the
throttleargument.throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllerCharacter of length 1, name of the controller to submit the tasks. If
NULL, the controller defaults to the first controller in the list.
Details
The idea comes from functional programming: for example,
the map() function from the purrr package.
Returns
A tibble of results and metadata: one row per task and
columns corresponding to the output of pop().
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller_group$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scaleLogical, whether to automatically scale workers to meet demand. See the
scaleargument of thepop()method of ordinary single controllers.collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values:-
"stop": throw an error in the main R session instead of returning a value. -
"warn": throw a warning. -
NULLor"silent": do not react to errors.
-
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
If there is no task to collect, return NULL. Otherwise,
return a one-row tibble with the same columns as pop()
for ordinary controllers.
Method collect()
Pop all available task results and return them in a tidy
tibble.
Usage
crew_class_controller_group$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scaleLogical of length 1, whether to automatically call
scale()to auto-scale workers to meet the demand of the task load.throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.errorNULLor character of length 1, choice of action if the popped task threw an error. Possible values:-
"stop": throw an error in the main R session instead of returning a value. -
"warn": throw a warning. -
NULLor"silent": do not react to errors.
-
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
A tibble of results and metadata of all resolved tasks,
with one row per task. Returns NULL if there are no available
results.
Method promise()
Create a promises::promise() object to asynchronously
pop or collect one or more tasks.
Usage
crew_class_controller_group$promise( mode = "one", seconds_interval = 0.1, scale = NULL, throttle = NULL, controllers = NULL )
Arguments
modeCharacter of length 1, what kind of promise to create.
modemust be"one"or"all". Details:If
modeis"one", then the promise is fulfilled (or rejected) when at least one task is resolved and available topop(). When that happens,pop()runs asynchronously, pops a result off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofpop()(a one-rowtibblewith the result and metadata). If the task threw an error, the error message of the task is forwarded to any error callbacks registered with the promise.If
modeis"all", then the promise is fulfilled (or rejected) when there are no unresolved tasks left in the controller. (Be careful: this condition is trivially met in the moment if the controller is empty and you have not submitted any tasks, so it is best to create this kind of promise only after you submit tasks.) When there are no unresolved tasks left,collect()runs asynchronously, pops all available results off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofcollect()(atibblewith one row per task result). If any of the tasks threw an error, then the first error message detected is forwarded to any error callbacks registered with the promise.
seconds_intervalPositive numeric of length 1, delay in the
later::later()polling interval to asynchronously check if the promise can be resolved.scaleDeprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()always turns on auto-scaling in a privatelaterloop (if not already activated).throttleDeprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()always turns on auto-scaling in a privatelaterloop (if not already activated).controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Please be aware that pop() or collect() will happen
asynchronously at a some unpredictable time after the promise object
is created, even if your local R process appears to be doing
something completely different. This behavior is highly desirable
in a Shiny reactive context, but please be careful as it may be
surprising in other situations.
Returns
A promises::promise() object whose eventual value will
be a tibble with results from one or more popped tasks.
If mode = "one", only one task is popped and returned (one row).
If mode = "all", then all the tasks are returned in a tibble
with one row per task (or NULL is returned if there are no
tasks to pop).
Method wait()
Wait for tasks.
Usage
crew_class_controller_group$wait( mode = "all", seconds_interval = 0.5, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
Arguments
modeCharacter of length 1:
"all"to wait for all tasks in all controllers to complete,"one"to wait for a single task in a single controller to complete. In this scheme, the timeout limit is applied to each controller sequentially, and a timeout is treated the same as a completed controller.seconds_intervalNumber of seconds to interrupt the wait in order to scale up workers as needed.
seconds_timeoutTimeout length in seconds waiting for results to become available.
scaleLogical of length 1, whether to call
scale_later()on each selected controller to schedule auto-scaling. See thescaleargument of thewait()method of ordinary single controllers.throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Details
The wait() method blocks the calling R session and
repeatedly auto-scales workers for tasks that need them.
The function runs until it either times out or the condition
in mode is met.
Returns
A logical of length 1, invisibly. TRUE if the condition
in mode was met, FALSE otherwise.
Method push_backlog()
Push the name of a task to the backlog.
Usage
crew_class_controller_group$push_backlog(name, controller = NULL)
Arguments
nameCharacter of length 1 with the task name to push to the backlog.
controllerCharacter vector of length 1 with the controller name. Set to
NULLto select the default controller thatpush_backlog()would choose.
Details
pop_backlog() pops the tasks that can be pushed
without saturating the controller.
Returns
NULL (invisibly).
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Usage
crew_class_controller_group$pop_backlog(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L) is returned.
Method summary()
Summarize the workers of one or more controllers.
Usage
crew_class_controller_group$summary(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary() method of the controller
class for specific information about the columns in the output.
Method terminate()
Terminate the workers and disconnect the client for one or more controllers.
Usage
crew_class_controller_group$terminate(controllers = NULL)
Arguments
controllersCharacter vector of controller names. Set to
NULLto select all controllers.
Returns
NULL (invisibly).
See Also
Other controller_group:
crew_controller_group()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}