Poll Functions {pbdZMQ} | R Documentation |
Poll Functions
Description
Poll functions
Usage
zmq.poll(socket, type, timeout = -1L, MC = ZMQ.MC())
zmq.poll.free()
zmq.poll.length()
zmq.poll.get.revents(index = 1L)
Arguments
socket |
a vector of ZMQ sockets |
type |
a vector of socket types corresponding to |
timeout |
timeout for poll, see ZeroMQ manual for details |
MC |
a message control, see |
index |
an index of ZMQ poll items to obtain revents |
Details
zmq.poll()
initials ZMQ poll items given ZMQ socket
's
and ZMQ poll type
's. Both socket
and type
are
in vectors of the same length, while socket
contains socket pointers
and type
contains types of poll.
See ZMQ.PO()
for the possible values of
type
. ZMQ defines several poll types and utilize
them to poll multiple sockets.
zmq.poll.free()
frees ZMQ poll structure memory internally.
zmq.poll.length()
obtains total numbers of ZMQ poll items.
zmq.poll.get.revents()
obtains revent types from ZMQ poll item by
the input index.
Value
zmq.poll()
returns a ZMQ code and an errno,
see ZeroMQ manual for details, no error/warning/interrupt in this
R
function, but some error/warning/interrupt may catch by
the C
function zmq_poll()
.
zmq.poll.length()
returns the total number of poll items
zmq.poll.get.revents()
returns the revent type
Author(s)
Wei-Chen Chen wccsnow@gmail.com.
References
ZeroMQ/4.1.0 API Reference: https://libzmq.readthedocs.io/en/zeromq4-1/
Programming with Big Data in R Website: https://pbdr.org/
See Also
Examples
## Not run:
### Using poll pattern.
### See demo/mspoller.r for details.
### Run next in background or the other window.
SHELL> Rscript wuserver.r &
SHELL> Rscript taskvent.r &
SHELL> Rscript mspoller.r
### The mspoller.r has next.
library(pbdZMQ, quietly = TRUE)
### Initial.
context <- zmq.ctx.new()
receiver <- zmq.socket(context, ZMQ.ST()$PULL)
zmq.connect(receiver, "tcp://localhost:5557")
subscriber <- zmq.socket(context, ZMQ.ST()$SUB)
zmq.connect(subscriber, "tcp://localhost:5556")
zmq.setsockopt(subscriber, ZMQ.SO()$SUBSCRIBE, "20993")
### Process messages from both sockets.
cat("Press Ctrl+C or Esc to stop mspoller.\n")
i.rec <- 0
i.sub <- 0
while(TRUE){
### Set poller.
zmq.poll(c(receiver, subscriber),
c(ZMQ.PO()$POLLIN, ZMQ.PO()$POLLIN))
### Check receiver.
if(bitwAnd(zmq.poll.get.revents(1), ZMQ.PO()$POLLIN)){
ret <- zmq.recv(receiver)
if(ret$len != -1){
cat("task ventilator:", ret$buf, "at", i.rec, "\n")
i.rec <- i.rec + 1
}
}
### Check subscriber.
if(bitwAnd(zmq.poll.get.revents(2), ZMQ.PO()$POLLIN)){
ret <- zmq.recv(subscriber)
if(ret$len != -1){
cat("weather update:", ret$buf, "at", i.sub, "\n")
i.sub <- i.sub + 1
}
}
if(i.rec >= 5 & i.sub >= 5){
break
}
Sys.sleep(runif(1, 0.5, 1))
}
### Finish.
zmq.poll.free()
zmq.close(receiver)
zmq.close(subscriber)
zmq.ctx.destroy(context)
## End(Not run)