crew_class_launcher {crew}R Documentation

Launcher abstract class

Description

R6 abstract class to build other subclasses which launch and manage workers.

Active bindings

workers

Data frame of worker information.

name

Name of the launcher.

seconds_interval

See crew_launcher().

seconds_timeout

See crew_launcher().

seconds_launch

See crew_launcher().

seconds_idle

See crew_launcher().

seconds_wall

See crew_launcher().

tasks_max

See crew_launcher().

tasks_timers

See crew_launcher().

reset_globals

See crew_launcher().

reset_packages

See crew_launcher().

reset_options

See crew_launcher().

garbage_collection

See crew_launcher().

launch_max

See crew_launcher().

tls

See crew_launcher().

processes

See crew_launcher(). asynchronously.

async

A crew_async() object to run low-level launcher tasks asynchronously.

throttle

A crew_throttle() object to throttle scaling.

Methods

Public methods


Method new()

Launcher constructor.

Usage
crew_class_launcher$new(
  name = NULL,
  seconds_interval = NULL,
  seconds_timeout = NULL,
  seconds_launch = NULL,
  seconds_idle = NULL,
  seconds_wall = NULL,
  seconds_exit = NULL,
  tasks_max = NULL,
  tasks_timers = NULL,
  reset_globals = NULL,
  reset_packages = NULL,
  reset_options = NULL,
  garbage_collection = NULL,
  launch_max = NULL,
  tls = NULL,
  processes = NULL
)
Arguments
name

See crew_launcher().

seconds_interval

See crew_launcher().

seconds_timeout

See crew_launcher().

seconds_launch

See crew_launcher().

seconds_idle

See crew_launcher().

seconds_wall

See crew_launcher().

seconds_exit

See crew_launcher().

tasks_max

See crew_launcher().

tasks_timers

See crew_launcher().

reset_globals

See crew_launcher().

reset_packages

See crew_launcher().

reset_options

See crew_launcher().

garbage_collection

See crew_launcher().

launch_max

See crew_launcher().

tls

See crew_launcher().

processes

See crew_launcher().

Returns

An R6 object with the launcher.

Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}

Method validate()

Validate the launcher.

Usage
crew_class_launcher$validate()
Returns

NULL (invisibly).


Method set_name()

Set the name of the launcher.

Usage
crew_class_launcher$set_name(name)
Arguments
name

Character of length 1, name to set for the launcher.

Returns

NULL (invisibly).


Method settings()

List of arguments for mirai::daemon().

Usage
crew_class_launcher$settings(socket)
Arguments
socket

Character of length 1, websocket address of the worker to launch.

Returns

List of arguments for mirai::daemon().


Method call()

Create a call to crew_worker() to help create custom launchers.

Usage
crew_class_launcher$call(socket, launcher, worker, instance)
Arguments
socket

Socket where the worker will receive tasks.

launcher

Character of length 1, name of the launcher.

worker

Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.

instance

Character of length 1 to uniquely identify the instance of the worker.

Returns

Character of length 1 with a call to crew_worker().

Examples
launcher <- crew_launcher_local()
launcher$call(
  socket = "ws://127.0.0.1:5000/3/cba033e58",
  launcher = "launcher_a",
  worker = 3L,
  instance = "cba033e58"
)

Method start()

Start the launcher.

Usage
crew_class_launcher$start(sockets = NULL)
Arguments
sockets

For testing purposes only.

Details

Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.

Returns

NULL (invisibly).


Method terminate()

Terminate the whole launcher, including all workers.

Usage
crew_class_launcher$terminate()
Returns

NULL (invisibly).


Method summary()

Summarize the workers.

Usage
crew_class_launcher$summary()
Returns

NULL if the launcher is not started. Otherwise, a tibble with one row per crew worker and the following columns:


Method tally()

Update the daemons-related columns of the internal workers data frame.

Usage
crew_class_launcher$tally(daemons = NULL)
Arguments
daemons

mirai daemons matrix. For testing only. Users should not set this.

Returns

NULL (invisibly).


Method unlaunched()

Get indexes of unlaunched workers.

Usage
crew_class_launcher$unlaunched(n = Inf)
Arguments
n

Maximum number of worker indexes to return.

Details

A worker is "unlaunched" if it has never connected to the current instance of its websocket. Once a worker launches with the launch() method, it is considered "launched" until it disconnects and its websocket is rotated with rotate().

Returns

Integer index of workers available for launch. The backlogged workers are listed first. A worker is backlogged if it is assigned more tasks than it completed.


Method booting()

Get workers that may still be booting up.

Usage
crew_class_launcher$booting()
Details

A worker is "booting" if its launch time is within the last seconds_launch seconds. seconds_launch is a configurable grace period when crew allows a worker to start up and connect to the mirai dispatcher. The booting() function does not know about the actual worker connection status, it just knows about launch times, so it may return TRUE for workers that have already connected and started doing tasks.


Method active()

Get active workers.

Usage
crew_class_launcher$active()
Details

A worker is "active" if its current instance is online and connected, or if it is within its booting time window and has never connected. In other words, "active" means online | (!discovered & booting).

Returns

Logical vector with TRUE for active workers and FALSE for inactive ones.


Method done()

Get done workers.

Usage
crew_class_launcher$done()
Details

A worker is "done" if it is launched and inactive. A worker is "launched" if launch() was called and the worker websocket has not been rotated since.

Returns

Integer index of inactive workers.


Method rotate()

Usage
crew_class_launcher$rotate()
Details

Rotate websockets at all unlaunched workers.

Returns

NULL (invisibly).


Method launch()

Launch a worker.

Usage
crew_class_launcher$launch(index)
Arguments
index

Positive integer of length 1, index of the worker to launch.

Returns

NULL (invisibly).


Method forward()

Forward an asynchronous launch/termination error condition of a worker.

Usage
crew_class_launcher$forward(index, condition = "error")
Arguments
index

Integer of length 1, index of the worker to inspect.

condition

Character of length 1 indicating what to do with an error if found. "error" to throw an error, "warning" to throw a warning, "message" to print a message, and "character" to return a character vector of specific task-level error messages. The return value is NULL if no error is found.

Returns

Throw an error, throw a warning, or return a character string, depending on the condition argument.


Method errors()

Collect and return the most recent error messages from asynchronous worker launching and termination.

Usage
crew_class_launcher$errors()
Returns

Character vector of all the most recent error messages from asynchronous worker launching and termination. NULL if there are no errors.


Method wait()

Wait for any local asynchronous launch or termination tasks to complete.

Usage
crew_class_launcher$wait()
Details

Only relevant if processes is a positive integer.

Returns

NULL (invisibly).


Method scale()

Auto-scale workers out to meet the demand of tasks.

Usage
crew_class_launcher$scale(demand, throttle = TRUE)
Arguments
demand

Number of unresolved tasks.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

Returns

NULL (invisibly)


Method launch_worker()

Abstract worker launch method.

Usage
crew_class_launcher$launch_worker(call, name, launcher, worker, instance)
Arguments
call

Character of length 1 with a namespaced call to crew_worker() which will run in the worker and accept tasks.

name

Character of length 1 with an informative worker name.

launcher

Character of length 1, name of the launcher.

worker

Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.

instance

Character of length 1 to uniquely identify the current instance of the worker a the index in the launcher.

Details

Launcher plugins will overwrite this method.

Returns

A handle to mock the worker launch.


Method terminate_worker()

Abstract worker termination method.

Usage
crew_class_launcher$terminate_worker(handle)
Arguments
handle

A handle object previously returned by launch_worker() which allows the termination of the worker.

Details

Launcher plugins will overwrite this method.

Returns

A handle to mock worker termination.


Method terminate_workers()

Terminate one or more workers.

Usage
crew_class_launcher$terminate_workers(index = NULL)
Arguments
index

Integer vector of the indexes of the workers to terminate. If NULL, all current workers are terminated.

Returns

NULL (invisibly).

See Also

Other launcher: crew_launcher()

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}

## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}

## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------

launcher <- crew_launcher_local()
launcher$call(
  socket = "ws://127.0.0.1:5000/3/cba033e58",
  launcher = "launcher_a",
  worker = 3L,
  instance = "cba033e58"
)

[Package crew version 0.9.5 Index]