hive_stream {hive} | R Documentation |
Hadoop Streaming with package hive
Description
High-level R function for using Hadoop Streaming.
Usage
hive_stream( mapper, reducer, input, output, henv = hive(),
mapper_args = NULL, reducer_args = NULL, cmdenv_arg = NULL,
streaming_args = NULL)
Arguments
mapper |
a function which is executed on each worker node. The so-called mapper typically maps input key/value pairs to a set of intermediate key/value pairs. |
reducer |
a function which is executed on each worker node. The so-called reducer reduces a set of intermediate values which share a key to a smaller set of values. If no reducer is used leave empty. |
input |
specifies the directory holding the data in the DFS. |
output |
specifies the output directory in the DFS containing the results after the streaming job finished. |
henv |
Hadoop local environment. |
mapper_args |
additional arguments to the mapper. |
reducer_args |
additional arguments to the reducer. |
cmdenv_arg |
additional arguments passed as environment variables to distributed tasks. |
streaming_args |
additional arguments passed to the Hadoop
Streaming utility. By default, only the number of reducers will be
set using |
Details
The function hive_stream()
starts a MapReduce job on the given
data located on the HDFS.
Author(s)
Stefan Theussl
References
Apache Hadoop Streaming (https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html).
Examples
## A simple word count example
## Put some xml files on the HDFS:
## Not run: DFS_put( system.file("defaults/core/", package = "hive"),
"/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/hdfs/hdfs-default.xml", package = "hive"),
"/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/mapred/mapred-default.xml", package = "hive"),
"/tmp/input" )
## End(Not run)
## Define the mapper and reducer function to be applied:
## Note that a Hadoop map or reduce job retrieves data line by line from stdin.
## Not run:
mapper <- function(x){
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
terms <- unlist(strsplit(line, " "))
terms <- terms[nchar(terms) > 1 ]
if( length(terms) )
cat( paste(terms, 1, sep = "\t"), sep = "\n")
}
}
reducer <- function(x){
env <- new.env( hash = TRUE )
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
keyvalue <- unlist( strsplit(line, "\t") )
if( exists(keyvalue[1], envir = env, inherits = FALSE) ){
assign( keyvalue[1], get(keyvalue[1], envir = env) + as.integer(keyvalue[2]),
envir = env )
} else {
assign( keyvalue[1], as.integer(keyvalue[2]), envir = env )
}
}
env <- as.list(env)
for( term in names(env) )
writeLines( paste(c(term, env[[term]]), collapse ="\t") )
}
hive_set_nreducer(1)
hive_stream( mapper = mapper, reducer = reducer, input = "/tmp/input", output = "/tmp/output" )
DFS_list("/tmp/output")
head( DFS_read_lines("/tmp/output/part-00000") )
## End(Not run)
## Don't forget to clean file system
## Not run: DFS_dir_remove("/tmp/input")
## Not run: DFS_dir_remove("/tmp/output")