crew_class_launcher {crew} | R Documentation |
Launcher abstract class
Description
R6
abstract class to build other subclasses
which launch and manage workers.
Active bindings
workers
Data frame of worker information.
name
Name of the launcher.
seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.reset_globals
See
crew_launcher()
.reset_packages
See
crew_launcher()
.reset_options
See
crew_launcher()
.garbage_collection
See
crew_launcher()
.launch_max
See
crew_launcher()
.tls
See
crew_launcher()
.processes
See
crew_launcher()
. asynchronously.async
A
crew_async()
object to run low-level launcher tasks asynchronously.throttle
A
crew_throttle()
object to throttle scaling.
Methods
Public methods
Method new()
Launcher constructor.
Usage
crew_class_launcher$new( name = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, launch_max = NULL, tls = NULL, processes = NULL )
Arguments
name
See
crew_launcher()
.seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.seconds_exit
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.reset_globals
See
crew_launcher()
.reset_packages
See
crew_launcher()
.reset_options
See
crew_launcher()
.garbage_collection
See
crew_launcher()
.launch_max
See
crew_launcher()
.tls
See
crew_launcher()
.processes
See
crew_launcher()
.
Returns
An R6
object with the launcher.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(workers = client$workers) launcher$launch(index = 1L) m <- mirai::mirai("result", .compute = client$name) Sys.sleep(0.25) m$data client$terminate() }
Method validate()
Validate the launcher.
Usage
crew_class_launcher$validate()
Returns
NULL
(invisibly).
Method set_name()
Set the name of the launcher.
Usage
crew_class_launcher$set_name(name)
Arguments
name
Character of length 1, name to set for the launcher.
Returns
NULL
(invisibly).
Method settings()
List of arguments for mirai::daemon()
.
Usage
crew_class_launcher$settings(socket)
Arguments
socket
Character of length 1, websocket address of the worker to launch.
Returns
List of arguments for mirai::daemon()
.
Method call()
Create a call to crew_worker()
to
help create custom launchers.
Usage
crew_class_launcher$call(socket, launcher, worker, instance)
Arguments
socket
Socket where the worker will receive tasks.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instance
Character of length 1 to uniquely identify the instance of the worker.
Returns
Character of length 1 with a call to crew_worker()
.
Examples
launcher <- crew_launcher_local() launcher$call( socket = "ws://127.0.0.1:5000/3/cba033e58", launcher = "launcher_a", worker = 3L, instance = "cba033e58" )
Method start()
Start the launcher.
Usage
crew_class_launcher$start(sockets = NULL)
Arguments
sockets
For testing purposes only.
Details
Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.
Returns
NULL
(invisibly).
Method terminate()
Terminate the whole launcher, including all workers.
Usage
crew_class_launcher$terminate()
Returns
NULL
(invisibly).
Method summary()
Summarize the workers.
Usage
crew_class_launcher$summary()
Returns
NULL
if the launcher is not started. Otherwise, a tibble
with one row per crew
worker and the following columns:
-
worker
: integer index of the worker. -
launches
: number of times the worker was launched. Each launch occurs at a different websocket because the token at the end of the URL is rotated before each new launch. -
online
: logical vector, whether the current instance of each worker was actively connected to its NNG socket during the time of the last call totally()
. -
discovered
: logical vector, whether the current instance of each worker had connected to its NNG socket at some point (and then possibly disconnected) during the time of the last call totally()
. -
assigned
: cumulative number of tasks assigned, reported bymirai::daemons()
and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker. -
complete
: cumulative number of tasks completed, reported bymirai::daemons()
and summed over all completed instances of the worker. Does not reflect the activity of the currently running instance of the worker. -
socket
: current websocket URL of the worker.
Method tally()
Update the daemons
-related columns of the internal
workers
data frame.
Usage
crew_class_launcher$tally(daemons = NULL)
Arguments
daemons
mirai
daemons matrix. For testing only. Users should not set this.
Returns
NULL
(invisibly).
Method unlaunched()
Get indexes of unlaunched workers.
Usage
crew_class_launcher$unlaunched(n = Inf)
Arguments
n
Maximum number of worker indexes to return.
Details
A worker is "unlaunched" if it has never connected
to the current instance of its websocket. Once a worker
launches with the launch()
method, it is considered "launched"
until it disconnects and its websocket is rotated with rotate()
.
Returns
Integer index of workers available for launch. The backlogged workers are listed first. A worker is backlogged if it is assigned more tasks than it completed.
Method booting()
Get workers that may still be booting up.
Usage
crew_class_launcher$booting()
Details
A worker is "booting" if its launch time is within the last
seconds_launch
seconds. seconds_launch
is a configurable grace
period when crew
allows a worker to start up and connect to the
mirai
dispatcher. The booting()
function does not know about the
actual worker connection status, it just knows about launch times,
so it may return TRUE
for workers that have already connected
and started doing tasks.
Method active()
Get active workers.
Usage
crew_class_launcher$active()
Details
A worker is "active" if its current instance is online and
connected, or if it is within its booting time window
and has never connected.
In other words, "active" means online | (!discovered & booting)
.
Returns
Logical vector with TRUE
for active workers and FALSE
for
inactive ones.
Method done()
Get done workers.
Usage
crew_class_launcher$done()
Details
A worker is "done" if it is launched and inactive.
A worker is "launched" if launch()
was called
and the worker websocket has not been rotated since.
Returns
Integer index of inactive workers.
Method rotate()
Usage
crew_class_launcher$rotate()
Details
Rotate websockets at all unlaunched workers.
Returns
NULL
(invisibly).
Method launch()
Launch a worker.
Usage
crew_class_launcher$launch(index)
Arguments
index
Positive integer of length 1, index of the worker to launch.
Returns
NULL
(invisibly).
Method forward()
Forward an asynchronous launch/termination error condition of a worker.
Usage
crew_class_launcher$forward(index, condition = "error")
Arguments
index
Integer of length 1, index of the worker to inspect.
condition
Character of length 1 indicating what to do with an error if found.
"error"
to throw an error,"warning"
to throw a warning,"message"
to print a message, and"character"
to return a character vector of specific task-level error messages. The return value isNULL
if no error is found.
Returns
Throw an error, throw a warning, or return a character string,
depending on the condition
argument.
Method errors()
Collect and return the most recent error messages from asynchronous worker launching and termination.
Usage
crew_class_launcher$errors()
Returns
Character vector of all the most recent error messages
from asynchronous worker launching and termination. NULL
if there are no errors.
Method wait()
Wait for any local asynchronous launch or termination tasks to complete.
Usage
crew_class_launcher$wait()
Details
Only relevant if processes
is a positive integer.
Returns
NULL
(invisibly).
Method scale()
Auto-scale workers out to meet the demand of tasks.
Usage
crew_class_launcher$scale(demand, throttle = TRUE)
Arguments
demand
Number of unresolved tasks.
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.
Returns
NULL
(invisibly)
Method launch_worker()
Abstract worker launch method.
Usage
crew_class_launcher$launch_worker(call, name, launcher, worker, instance)
Arguments
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.name
Character of length 1 with an informative worker name.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.
instance
Character of length 1 to uniquely identify the current instance of the worker a the index in the launcher.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock the worker launch.
Method terminate_worker()
Abstract worker termination method.
Usage
crew_class_launcher$terminate_worker(handle)
Arguments
handle
A handle object previously returned by
launch_worker()
which allows the termination of the worker.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock worker termination.
Method terminate_workers()
Terminate one or more workers.
Usage
crew_class_launcher$terminate_workers(index = NULL)
Arguments
index
Integer vector of the indexes of the workers to terminate. If
NULL
, all current workers are terminated.
Returns
NULL
(invisibly).
See Also
Other launcher:
crew_launcher()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)