snowdoop,filechunkname, etc... {partools} | R Documentation |
Snowdoop.
Description
“Snowdoop”: Utilities for distributed file storage, access and related operations.
Usage
filechunkname(basenm,ndigs,nodenum=NULL)
filesort(cls,infilenm,colnum,outdfnm,infiledst=FALSE,
ndigs=0,nsamp=1000,header=FALSE,sep="",usefread=FALSE)
filesplit(nch,basenm,header=FALSE,seqnums=FALSE)
filesplitrand(cls,fname,newbasename,ndigs,header=FALSE,sep)
fileshuffle(inbasename, nout, outbasename, header = FALSE)
linecount(infile,header=FALSE,chunksize=100000)
filecat(cls, basenm, header = FALSE)
readnscramble(cls,basenm,header=FALSE,sep= " ")
filesave(cls,dname,newbasename,ndigs,sep)
fileread(cls,fname,dname,ndigs,header=FALSE,sep=" ",usefread=FALSE)
getnumdigs(nch)
fileagg(fnames,ynames,xnames,header=FALSE,sep= " ",FUN,FUN1=FUN)
dfileagg(cls,fnames,ynames,xnames,header=FALSE,sep=" ",FUN,FUN1=FUN)
filegetrows(fnames,tmpdataexpr,header=FALSE,sep=" ")
dfilegetrows(cls,fnames,tmpdataexpr,header=FALSE,sep=" ")
Arguments
cls |
A cluster for the parallel package. |
nch |
Number of chunks for the file split. |
basenm |
A chunked file name, minus suffix. |
infile |
Name of a nonchunked file. |
ndigs |
Number of digits in the chunked file name suffix. |
nodenum |
If non-NULL, get the name of the file chunk of cluster node
|
infilenm |
Name of input file (without suffix, if distributed). |
outdfnm |
Name of output file (without suffix). |
infiledst |
If TRUE, infilenm is distributed. |
colnum |
Column number on which the sort will be done. It is assumed that this data column is free of NAs. |
usefread |
If true, use |
nsamp |
Number of records to sample in each file chunk to determine bins for the bucket sort. |
header |
TRUE if the file chunks have headers. |
seqnums |
TRUE if the file chunks will have sequence numbers. |
sep |
Field delimiter used in |
chunksize |
Number of lines to read at a time, for efficient I/O. |
dname |
Quoted name of a distributed data frame or matrix. For
|
fname |
Quoted name of a distributed file. |
fnames |
Character vector of file names. |
newbasename |
Quoted name of the prefix of a distributed file,
e.g. |
ynames |
Vector of quoted names of variables on which |
xnames |
Vector of quoted names of variables to be used for cell definition. |
tmpdataexpr |
Expression involving a data frame
|
FUN |
First-level aggregation function. |
FUN1 |
Second-level aggregation function. |
inbasename |
basename of the input files, e.g. x for x.1, x.2, ... |
outbasename |
basename of the output files |
nout |
number of output files |
Details
Use filesplit
to convert a single file into distributed one, with
nch
chunks. The file header, if present, will be retained in the
chunks. If seqnums
is TRUE, each line in a chunk will be preceded
by the line number it had in the original file.
The reverse operation to filesplit
is performed by
filecat
, which converts a distributed file into a single one.
The fileagg
function does an out-of-memory, multifile version of
aggregate
, reading the specified files one at a time, and
returning a grand aggregation. The function dfileagg
partitions
the specified group of files to a partools
cluster, has each
call fileagg
, and again aggregates the results.
The function filegetrows
reads in the files in fnames
, one
at a time, naming the resulting in-memory data tmpdata
each time.
(It is assumed that the data fit in memory.) The function applies the
user command tmpdataexpr
to tmpdata
, producing a subset of
tmpdata
. All of these subsets are combined using rbind
,
yielding the return value. The paired function dfilegetrows
is a
distributed wrapper for filegetrows
, just as dfileagg
is
for fileagg
.
Use filesort
to do a file sort, with the input file being either
distributed or ordinary, placing the result as a distributed data
frame/matrix in the memories of the cluster nodes. The first
nsamp
records are read from the file, and are used to form one
quantile range for each cluster node. Each node then reads the input
file, retaining the records in its assigned range, and sorts them. This
results in the input file being sorted, in memory, in a distributed
manner across nodes, under the specifid name. At present, this
utility is not very efficient.
Operations such as ca
need i.i.d. data. If the original file
storage was ordered on some variable, one needs to randomize the data
first. There are several options:
-
readnscramble
: This produces a distributed data frame/matrix under the namebasenm
. Note that a record in chunki
of the distributed file will likely end up in chunkj
in the distributed data frame/matrix, withj
different fromi
. -
filesplitrand
: Use this you wish to directly produce a randomized distributed file from a monolithic one. It will read the file into memory, chunk it at the cluster nodes, each of which will save its chunk to disk. -
fileshuffle
: If you need to avoid reading big files into memory, use this. You must runfilesplit
first, and then runfileshuffle
several times for a good shuffle.Note that this function is also useful if your cluster size changes. A distributed file of m chunks can now be converted to one with n chunks, either more or fewer than before.
If you wish to use this same randomized data in a future session, you
can save it as a distributed file by calling filesave
. Of course,
this function is also useful if one wishes to save a distributed data
frame or matrix that was created computationally rather than from read
from a distributed file. To go the other direction, i.e. read a
distributed file, use fileread
.
Some of the functions here are useful mainly as intermediate operations for the others:
The function
filechunkname
returns the name of the file chunk for the calling cluster node.The
linecount
function returns the number of lines in a text file.A call to
getnumdigs
returns the number of digits in a distributed file name suffix.
Author(s)
Norm Matloff
Examples
cls <- makeCluster(2)
setclsinfo(cls)
# example of filesplit()
# make test input file
m <- rbind(1:2,3:4,5:6)
write.table(m,"m",row.names=FALSE,col.names=FALSE)
# apply the function
filesplit(2,"m",seqnums=TRUE)
# file m.1 and m.2 created, with contents c(1,1,2) and
# rbind(c(2,3,4),c(3,5,6)), respectively
# check it
read.table("m.1",header=FALSE,row.names=1)
read.table("m.2",header=FALSE,row.names=1)
m
# example of filecat(); assumes filesplit() example above already done
# delete file m so we can make sure we are re-creating it
unlink("m")
filecat(cls,"m")
# check that file m is back
read.table("m",row.names=1)
# example of filesave(), fileread()
# make test distributed data frame
clusterEvalQ(cls,x <- data.frame(u = runif(5),v = runif(5)))
# apply filesave()
filesave(cls,'x','xfile',1,' ')
# check it
fileread(cls,'xfile','xx',1,header=TRUE,sep=' ')
clusterEvalQ(cls,xx)
clusterEvalQ(cls,x)
# example of filesort()
# make test distributed input file
m1 <- matrix(c(5,12,13,3,4,5,8,8,8,1,2,3,6,5,4),byrow=TRUE,ncol=3)
m2 <- matrix(c(0,22,88,44,5,5,2,6,10,7,7,7),byrow=TRUE,ncol=3)
write.table(m1,"m.1",row.names=FALSE)
write.table(m2,"m.2",row.names=FALSE)
# sort on column 2 and check result
filesort(cls,"m",2,"msort",infiledst=TRUE,ndigs=1,nsamp=3,header=TRUE)
clusterEvalQ(cls,msort) # data should be sorted on V2
# check by comparing to input
m1
m2
m <- rbind(m1,m2)
write.table(m,"m",row.names=FALSE)
clusterEvalQ(cls,rm(msort))
filesort(cls,"m",2,"msort",infiledst=FALSE,nsamp=3,header=TRUE)
clusterEvalQ(cls,msort) # data should be sorted on V2
# example of readnscramble()
co2 <- head(CO2,25)
write.table(co2,"co2",row.names=FALSE) # creates file 'co2'
filesplit(2,"co2",header=TRUE) # creates files 'co2.1', 'co2.2'
readnscramble(cls,"co2",header=TRUE) # now have distrib. d.f.
# save the scrambled version to disk
filesave(cls,'co2','co2s',1,sep=',')
# example of fileshuffle()
# make test file, 'test'
cat('a','bc','def','i','j','k',file='test',sep='\n')
filesplit(2,'test') # creates files 'test.1','test.2'
fileshuffle('test',2,'testa') # creates shuffled files 'testa.1','testa.2'
# example of filechunkname()
clusterEvalQ(cls,filechunkname("x",3)) # returns "x.001", "x.002"
# example of getnumdigs()
getnumdigs(156) # should be 3
# examples of filesave() and fileread()
mtc <- mtcars
distribsplit(cls,"mtc")
# save distributed data frame to distributed file
filesave(cls,'mtc','ctm',1,',')
# read it back in to a new distributed data frame
fileread(cls,'ctm','ctmnew',1,header=TRUE,sep=',')
# check it
clusterEvalQ(cls,ctmnew)
# try dfileagg() on it (not same as distribagg())
dfileagg(cls,c('ctm.1','ctm.2'),c("mpg","disp","hp"),c("cyl","gear"),header=TRUE,sep=",","max")
# check
aggregate(cbind(mpg,disp,hp) ~ cyl+gear,data=mtcars,FUN=max)
# extract the records with 4 cylinders and 4 gears (again, different
# from distribgetrows())
cmd <- 'tmpdata[tmpdata$cyl == 4 & tmpdata$gear == 4,]'
dfilegetrows(cls,c('ctm.1','ctm.2'),cmd,header=TRUE,sep=',')
# check
mtc[mtc$cyl == 4 & mtc$gear == 4,]
stopCluster(cls)