MapReduce {DSL} | R Documentation |
MapReduce for "DList"
Objects
Description
Interface to apply functions on elements of "DList"
objects.
Usage
DLapply( x, FUN, parallel, ..., keep = FALSE )
DMap( x, MAP, parallel, keep = FALSE )
DReduce( x, REDUCE, parallel, ... )
Arguments
x |
a |
FUN |
the function to be applied to each element (i.e., the
values) of |
MAP |
the function to be applied to each key/value pair in
|
REDUCE |
the function to be applied to each key/value pair in
|
... |
optional arguments to |
parallel |
logical; should the provided functions applied in
parallel? Default: |
keep |
logical; should the current data be kept as a separate
revision for further processing later? Default: |
Details
The MapReduce programming model as defined by Dean and Ghemawat (2008)
is as follows: the computation takes a set of input key/value pairs, and produces a
set of output key/value pairs. The user expresses the computation as two
functions: Map and Reduce. The Map function takes an input pair and produces a set of
intermediate key/value pairs. The Reduce function accepts an
intermediate key and a set of values for that key (possibly
grouped by the MapReduce library). It merges these values
together to form a possibly smaller set of values. Typically, just zero or
one output value is produced per reduce invocation. Furthermore, data is
usually stored on a (distributed) file system which is recognized by the
MapReduce library. This allows such a framework to handle lists of
values (here objects of class "DList"
) that are too
large to fit in main memory (i.e., RAM).
Value
A "DList"
.
References
J. Dean and S. Ghemawat (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51, 107–113.
Examples
dl <- DList( line1 = "This is the first line.",
line2 = "Now, the second line." )
res <- DLapply( dl, function(x) unlist(strsplit(x, " ")) )
as.list( res )
foo <- function( keypair )
list( key = paste("next_", keypair$key, sep = ""), value =
gsub("first", "mapped", keypair$value) )
dlm <- DMap( x = dl, MAP = foo)
## retrieve keys
unlist(DGather(dlm, keys = TRUE, names = FALSE))
## retrieve values
as.list( dlm )
## simple wordcount based on two files:
dir(system.file("examples", package = "DSL"))
## first force 1 chunk per file (set max chunk size to 1 byte):
ds <- DStorage("LFS", tempdir(), chunksize = 1L)
## make "DList" from files, i.e., read contents and store in chunks
dl <- as.DList(system.file("examples", package = "DSL"), DStorage = ds)
## read files
dl <- DMap(dl, function( keypair ){
list( key = keypair$key, value = tryCatch(readLines(keypair$value),
error = function(x) NA) )
})
## split into terms
splitwords <- function( keypair ){
keys <- unlist(strsplit(keypair$value, " "))
mapply( function(key, value) list( key = key, value = value), keys, rep(1L, length(keys)),
SIMPLIFY = FALSE, USE.NAMES = FALSE )
}
res <- DMap( dl, splitwords )
as.list(res)
## now aggregate by term
res <- DReduce( res, sum )
as.list( res )