RushWorker {rush}R Documentation

Rush Worker

Description

RushWorker evaluates tasks and writes results to the data base. The worker inherits from Rush.

Value

Object of class R6::R6Class and RushWorker with worker methods.

Super class

rush::Rush -> RushWorker

Public fields

worker_id

(character(1))
Identifier of the worker.

remote

(logical(1))
Whether the worker is on a remote machine.

heartbeat

('r_process“)
Background process for the heartbeat.

Active bindings

terminated

(logical(1))
Whether to shutdown the worker. Used in the worker loop to determine whether to continue.

terminated_on_idle

(logical(1))
Whether to shutdown the worker if no tasks are queued. Used in the worker loop to determine whether to continue.

Methods

Public methods

Inherited methods

Method new()

Creates a new instance of this R6 class.

Usage
RushWorker$new(
  network_id,
  config = NULL,
  remote,
  worker_id = NULL,
  heartbeat_period = NULL,
  heartbeat_expire = NULL,
  lgr_thresholds = NULL,
  lgr_buffer_size = 0,
  seed = NULL
)
Arguments
network_id

(character(1))
Identifier of the rush network. Controller and workers must have the same instance id. Keys in Redis are prefixed with the instance id.

config

(redux::redis_config)
Redis configuration options. If NULL, configuration set by rush_plan() is used. If rush_plan() has not been called, the REDIS_URL environment variable is parsed. If REDIS_URL is not set, a default configuration is used. See redux::redis_config for details.

remote

(logical(1))
Whether the worker is started on a remote machine. See Rush for details.

worker_id

(character(1))
Identifier of the worker. Keys in redis specific to the worker are prefixed with the worker id.

heartbeat_period

(integer(1))
Period of the heartbeat in seconds.

heartbeat_expire

(integer(1))
Time to live of the heartbeat in seconds.

lgr_thresholds

(named character() | named numeric())
Logger threshold on the workers e.g. c(rush = "debug").

lgr_buffer_size

(integer(1))
By default (lgr_buffer_size = 0), the log messages are directly saved in the Redis data store. If lgr_buffer_size > 0, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.

seed

(integer())
Initial seed for the random number generator. Either a L'Ecuyer-CMRG seed (integer(7)) or a regular RNG seed (integer(1)). The later is converted to a L'Ecuyer-CMRG seed. If NULL, no seed is used for the random number generator.


Method push_running_tasks()

Push a task to running tasks without queue.

Usage
RushWorker$push_running_tasks(xss, extra = NULL)
Arguments
xss

(list of named list())
Lists of arguments for the function e.g. ⁠list(list(x1, x2), list(x1, x2)))⁠.

extra

(list)
List of additional information stored along with the task e.g. ⁠list(list(timestamp), list(timestamp)))⁠.

Returns

(character())
Keys of the tasks.


Method pop_task()

Pop a task from the queue. Task is moved to the running tasks.

Usage
RushWorker$pop_task(timeout = 1, fields = "xs")
Arguments
timeout

(numeric(1))
Time to wait for task in seconds.

fields

(character())
Fields to be returned.


Method push_results()

Pushes results to the data base.

Usage
RushWorker$push_results(keys, yss, extra = NULL)
Arguments
keys

(character(1))
Keys of the associated tasks.

yss

(named list())
List of lists of named results.

extra

(named list())
List of lists of additional information stored along with the results.


Method set_terminated()

Mark the worker as terminated. Last step in the worker loop before the worker terminates.

Usage
RushWorker$set_terminated()

Method clone()

The objects of this class are cloneable with this method.

Usage
RushWorker$clone(deep = FALSE)
Arguments
deep

Whether to make a deep clone.

Note

The worker registers itself in the data base of the rush network.

Examples

# This example is not executed since Redis must be installed

   config_local = redux::redis_config()
   rush = rsh(network_id = "test_network", config = config_local)

   fun = function(x1, x2, ...) list(y = x1 + x2)
   rush$start_local_workers(fun = fun)

   rush$stop_workers()


[Package rush version 0.1.1 Index]