| crew_class_launcher {crew} | R Documentation |
Launcher abstract class
Description
R6 abstract class to build other subclasses
which launch and manage workers.
Active bindings
workersData frame of worker information.
nameName of the launcher.
seconds_intervalSee
crew_launcher().seconds_timeoutSee
crew_launcher().seconds_launchSee
crew_launcher().seconds_idleSee
crew_launcher().seconds_wallSee
crew_launcher().tasks_maxSee
crew_launcher().tasks_timersSee
crew_launcher().reset_globalsSee
crew_launcher().reset_packagesSee
crew_launcher().reset_optionsSee
crew_launcher().garbage_collectionSee
crew_launcher().launch_maxSee
crew_launcher().tlsSee
crew_launcher().processesSee
crew_launcher(). asynchronously.asyncA
crew_async()object to run low-level launcher tasks asynchronously.throttleA
crew_throttle()object to throttle scaling.
Methods
Public methods
Method new()
Launcher constructor.
Usage
crew_class_launcher$new( name = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, launch_max = NULL, tls = NULL, processes = NULL )
Arguments
nameSee
crew_launcher().seconds_intervalSee
crew_launcher().seconds_timeoutSee
crew_launcher().seconds_launchSee
crew_launcher().seconds_idleSee
crew_launcher().seconds_wallSee
crew_launcher().seconds_exitSee
crew_launcher().tasks_maxSee
crew_launcher().tasks_timersSee
crew_launcher().reset_globalsSee
crew_launcher().reset_packagesSee
crew_launcher().reset_optionsSee
crew_launcher().garbage_collectionSee
crew_launcher().launch_maxSee
crew_launcher().tlsSee
crew_launcher().processesSee
crew_launcher().
Returns
An R6 object with the launcher.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
Method validate()
Validate the launcher.
Usage
crew_class_launcher$validate()
Returns
NULL (invisibly).
Method set_name()
Set the name of the launcher.
Usage
crew_class_launcher$set_name(name)
Arguments
nameCharacter of length 1, name to set for the launcher.
Returns
NULL (invisibly).
Method settings()
List of arguments for mirai::daemon().
Usage
crew_class_launcher$settings(socket)
Arguments
socketCharacter of length 1, websocket address of the worker to launch.
Returns
List of arguments for mirai::daemon().
Method call()
Create a call to crew_worker() to
help create custom launchers.
Usage
crew_class_launcher$call(socket, launcher, worker, instance)
Arguments
socketSocket where the worker will receive tasks.
launcherCharacter of length 1, name of the launcher.
workerPositive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instanceCharacter of length 1 to uniquely identify the instance of the worker.
Returns
Character of length 1 with a call to crew_worker().
Examples
launcher <- crew_launcher_local() launcher$call( socket = "ws://127.0.0.1:5000/3/cba033e58", launcher = "launcher_a", worker = 3L, instance = "cba033e58" )
Method start()
Start the launcher.
Usage
crew_class_launcher$start(sockets = NULL)
Arguments
socketsFor testing purposes only.
Details
Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.
Returns
NULL (invisibly).
Method terminate()
Terminate the whole launcher, including all workers.
Usage
crew_class_launcher$terminate()
Returns
NULL (invisibly).
Method summary()
Summarize the workers.
Usage
crew_class_launcher$summary()
Returns
NULL if the launcher is not started. Otherwise, a tibble
with one row per crew worker and the following columns:
-
worker: integer index of the worker. -
launches: number of times the worker was launched. Each launch occurs at a different websocket because the token at the end of the URL is rotated before each new launch. -
online: logical vector, whether the current instance of each worker was actively connected to its NNG socket during the time of the last call totally(). -
discovered: logical vector, whether the current instance of each worker had connected to its NNG socket at some point (and then possibly disconnected) during the time of the last call totally(). -
assigned: cumulative number of tasks assigned, reported bymirai::daemons()and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker. -
complete: cumulative number of tasks completed, reported bymirai::daemons()and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker. -
socket: current websocket URL of the worker.
Method tally()
Update the daemons-related columns of the internal
workers data frame.
Usage
crew_class_launcher$tally(daemons = NULL)
Arguments
daemonsmiraidaemons matrix. For testing only. Users should not set this.
Returns
NULL (invisibly).
Method unlaunched()
Get indexes of unlaunched workers.
Usage
crew_class_launcher$unlaunched(n = Inf)
Arguments
nMaximum number of worker indexes to return.
Details
A worker is "unlaunched" if it has never connected
to the current instance of its websocket. Once a worker
launches with the launch() method, it is considered "launched"
until it disconnects and its websocket is rotated with rotate().
Returns
Integer index of workers available for launch. The backlogged workers are listed first. A worker is backlogged if it is assigned more tasks than it completed.
Method booting()
Get workers that may still be booting up.
Usage
crew_class_launcher$booting()
Details
A worker is "booting" if its launch time is within the last
seconds_launch seconds. seconds_launch is a configurable grace
period when crew allows a worker to start up and connect to the
mirai dispatcher. The booting() function does not know about the
actual worker connection status, it just knows about launch times,
so it may return TRUE for workers that have already connected
and started doing tasks.
Method active()
Get active workers.
Usage
crew_class_launcher$active()
Details
A worker is "active" if its current instance is online and
connected, or if it is within its booting time window
and has never connected.
In other words, "active" means online | (!discovered & booting).
Returns
Logical vector with TRUE for active workers and FALSE for
inactive ones.
Method done()
Get done workers.
Usage
crew_class_launcher$done()
Details
A worker is "done" if it is launched and inactive.
A worker is "launched" if launch() was called
and the worker websocket has not been rotated since.
Returns
Integer index of inactive workers.
Method rotate()
Usage
crew_class_launcher$rotate()
Details
Rotate websockets at all unlaunched workers.
Returns
NULL (invisibly).
Method launch()
Launch a worker.
Usage
crew_class_launcher$launch(index)
Arguments
indexPositive integer of length 1, index of the worker to launch.
Returns
NULL (invisibly).
Method forward()
Forward an asynchronous launch/termination error condition of a worker.
Usage
crew_class_launcher$forward(index, condition = "error")
Arguments
indexInteger of length 1, index of the worker to inspect.
conditionCharacter of length 1 indicating what to do with an error if found.
"error"to throw an error,"warning"to throw a warning,"message"to print a message, and"character"to return a character vector of specific task-level error messages. The return value isNULLif no error is found.
Returns
Throw an error, throw a warning, or return a character string,
depending on the condition argument.
Method errors()
Collect and return the most recent error messages from asynchronous worker launching and termination.
Usage
crew_class_launcher$errors()
Returns
Character vector of all the most recent error messages
from asynchronous worker launching and termination. NULL
if there are no errors.
Method wait()
Wait for any local asynchronous launch or termination tasks to complete.
Usage
crew_class_launcher$wait()
Details
Only relevant if processes is a positive integer.
Returns
NULL (invisibly).
Method scale()
Auto-scale workers out to meet the demand of tasks.
Usage
crew_class_launcher$scale(demand, throttle = TRUE)
Arguments
demandNumber of unresolved tasks.
throttleTRUEto skip auto-scaling if it already happened within the lastseconds_intervalseconds.FALSEto auto-scale every timescale()is called. Throttling avoids overburdening themiraidispatcher and other resources.
Returns
NULL (invisibly)
Method launch_worker()
Abstract worker launch method.
Usage
crew_class_launcher$launch_worker(call, name, launcher, worker, instance)
Arguments
callCharacter of length 1 with a namespaced call to
crew_worker()which will run in the worker and accept tasks.nameCharacter of length 1 with an informative worker name.
launcherCharacter of length 1, name of the launcher.
workerPositive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.
instanceCharacter of length 1 to uniquely identify the current instance of the worker a the index in the launcher.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock the worker launch.
Method terminate_worker()
Abstract worker termination method.
Usage
crew_class_launcher$terminate_worker(handle)
Arguments
handleA handle object previously returned by
launch_worker()which allows the termination of the worker.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock worker termination.
Method terminate_workers()
Terminate one or more workers.
Usage
crew_class_launcher$terminate_workers(index = NULL)
Arguments
indexInteger vector of the indexes of the workers to terminate. If
NULL, all current workers are terminated.
Returns
NULL (invisibly).
See Also
Other launcher:
crew_launcher()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)