| Queue {ipc} | R Documentation |
A Class containing a producer and consumer
Description
Creates a Queue object for inter-process communication.
Its members producer and consumer are the main entry points for
sending and receiving messages respectively.
Usage
queue(
source = defaultSource()$new(),
producer = Producer$new(source),
consumer = Consumer$new(source)
)
Arguments
source |
The source for reading and writing the queue |
producer |
The producer for the source |
consumer |
The consumer of the source |
Details
This function creates a queue object for communication between different R processes,
including forks of the same process. By default, it uses txtq backage as its backend.
Technically, the information is sent through temporary files, created in a new directory
inside the session-specific temporary folder (see tempfile).
This requires that the new directory is writeable, this is normally the case but
if Sys.umask forbids writing, the communication fails with an error.
Public fields
producerA Producer object
consumera Consumer object.
Methods
Public methods
Method new()
Create a Queue object
Usage
Queue$new(source, prod, cons)
Arguments
sourceThe source to use for communication.
prodA Producer object.
consA Consumer object.
Method destroy()
clean up object after use.
Usage
Queue$destroy()
Method clone()
The objects of this class are cloneable with this method.
Usage
Queue$clone(deep = FALSE)
Arguments
deepWhether to make a deep clone.
Examples
## Not run:
library(parallel)
library(future)
library(promises)
plan(multisession)
q <- queue()
# communicate from main session to child
fut <- future({
for(i in 1:1000){
Sys.sleep(.1)
q$consumer$consume()
}
})
q$producer$fireEval(stop("Stop that child"))
cat(try(value(fut)))
# Communicate from child to main session
j <- 0
fut <- future({
for(i in 1:10){
Sys.sleep(.2)
# set j in the main thread substituting i into the expression
q$producer$fireEval(j <- i, env=list(i=i))
}
})
while(j < 10){
q$consumer$consume() # collect and execute assignments
cat("j = ", j, "\n")
Sys.sleep(.1)
}
fut <- future({
for(i in 1:10){
Sys.sleep(.2)
# set j in the main thread substituting i into the expression
q$producer$fireEval(print(i), env=list(i=i))
}
})
q$consumer$start() # execute `comsume` at regular intervals
# clean up
q$destroy()
## End(Not run)