snowdoop,filechunkname, etc... {partools}R Documentation

Snowdoop.

Description

“Snowdoop”: Utilities for distributed file storage, access and related operations.

Usage

filechunkname(basenm,ndigs,nodenum=NULL)
filesort(cls,infilenm,colnum,outdfnm,infiledst=FALSE,
   ndigs=0,nsamp=1000,header=FALSE,sep="",usefread=FALSE)
filesplit(nch,basenm,header=FALSE,seqnums=FALSE)
filesplitrand(cls,fname,newbasename,ndigs,header=FALSE,sep)
fileshuffle(inbasename, nout, outbasename, header = FALSE)
linecount(infile,header=FALSE,chunksize=100000)
filecat(cls, basenm, header = FALSE)  
readnscramble(cls,basenm,header=FALSE,sep= " ")
filesave(cls,dname,newbasename,ndigs,sep)
fileread(cls,fname,dname,ndigs,header=FALSE,sep=" ",usefread=FALSE)
getnumdigs(nch)
fileagg(fnames,ynames,xnames,header=FALSE,sep= " ",FUN,FUN1=FUN) 
dfileagg(cls,fnames,ynames,xnames,header=FALSE,sep=" ",FUN,FUN1=FUN) 
filegetrows(fnames,tmpdataexpr,header=FALSE,sep=" ") 
dfilegetrows(cls,fnames,tmpdataexpr,header=FALSE,sep=" ") 

Arguments

cls

A cluster for the parallel package.

nch

Number of chunks for the file split.

basenm

A chunked file name, minus suffix.

infile

Name of a nonchunked file.

ndigs

Number of digits in the chunked file name suffix.

nodenum

If non-NULL, get the name of the file chunk of cluster node nodenum; otherwise, get the name for the chunk associated with this node.

infilenm

Name of input file (without suffix, if distributed).

outdfnm

Name of output file (without suffix).

infiledst

If TRUE, infilenm is distributed.

colnum

Column number on which the sort will be done. It is assumed that this data column is free of NAs.

usefread

If true, use fread instead of read.table; generally much faster; requires data.table package.

nsamp

Number of records to sample in each file chunk to determine bins for the bucket sort.

header

TRUE if the file chunks have headers.

seqnums

TRUE if the file chunks will have sequence numbers.

sep

Field delimiter used in read.table.

chunksize

Number of lines to read at a time, for efficient I/O.

dname

Quoted name of a distributed data frame or matrix. For filesave, the object must have column names.

fname

Quoted name of a distributed file.

fnames

Character vector of file names.

newbasename

Quoted name of the prefix of a distributed file, e.g. xyz for a distributed file xyz.01, xyz.02 etc.

ynames

Vector of quoted names of variables on which FUN is to be applied.

xnames

Vector of quoted names of variables to be used for cell definition.

tmpdataexpr

Expression involving a data frame tmpdataexpr. See below.

FUN

First-level aggregation function.

FUN1

Second-level aggregation function.

inbasename

basename of the input files, e.g. x for x.1, x.2, ...

outbasename

basename of the output files

nout

number of output files

Details

Use filesplit to convert a single file into distributed one, with nch chunks. The file header, if present, will be retained in the chunks. If seqnums is TRUE, each line in a chunk will be preceded by the line number it had in the original file.

The reverse operation to filesplit is performed by filecat, which converts a distributed file into a single one.

The fileagg function does an out-of-memory, multifile version of aggregate, reading the specified files one at a time, and returning a grand aggregation. The function dfileagg partitions the specified group of files to a partools cluster, has each call fileagg, and again aggregates the results.

The function filegetrows reads in the files in fnames, one at a time, naming the resulting in-memory data tmpdata each time. (It is assumed that the data fit in memory.) The function applies the user command tmpdataexpr to tmpdata, producing a subset of tmpdata. All of these subsets are combined using rbind, yielding the return value. The paired function dfilegetrows is a distributed wrapper for filegetrows, just as dfileagg is for fileagg.

Use filesort to do a file sort, with the input file being either distributed or ordinary, placing the result as a distributed data frame/matrix in the memories of the cluster nodes. The first nsamp records are read from the file, and are used to form one quantile range for each cluster node. Each node then reads the input file, retaining the records in its assigned range, and sorts them. This results in the input file being sorted, in memory, in a distributed manner across nodes, under the specifid name. At present, this utility is not very efficient.

Operations such as ca need i.i.d. data. If the original file storage was ordered on some variable, one needs to randomize the data first. There are several options:

If you wish to use this same randomized data in a future session, you can save it as a distributed file by calling filesave. Of course, this function is also useful if one wishes to save a distributed data frame or matrix that was created computationally rather than from read from a distributed file. To go the other direction, i.e. read a distributed file, use fileread.

Some of the functions here are useful mainly as intermediate operations for the others:

Author(s)

Norm Matloff

Examples

cls <- makeCluster(2)
setclsinfo(cls)

# example of filesplit()
# make test input file
m <- rbind(1:2,3:4,5:6) 
write.table(m,"m",row.names=FALSE,col.names=FALSE) 
# apply the function
filesplit(2,"m",seqnums=TRUE)
# file m.1 and m.2 created, with contents c(1,1,2) and
# rbind(c(2,3,4),c(3,5,6)), respectively
# check it
read.table("m.1",header=FALSE,row.names=1)
read.table("m.2",header=FALSE,row.names=1)
m

# example of filecat(); assumes filesplit() example above already done
# delete file m so we can make sure we are re-creating it
unlink("m")
filecat(cls,"m")
# check that file m is back
read.table("m",row.names=1)

# example of filesave(), fileread()
# make test distributed data frame
clusterEvalQ(cls,x <- data.frame(u = runif(5),v = runif(5)))
# apply filesave()
filesave(cls,'x','xfile',1,' ')
# check it
fileread(cls,'xfile','xx',1,header=TRUE,sep=' ')
clusterEvalQ(cls,xx)
clusterEvalQ(cls,x)


# example of filesort()
# make test distributed input file
m1 <- matrix(c(5,12,13,3,4,5,8,8,8,1,2,3,6,5,4),byrow=TRUE,ncol=3)
m2 <- matrix(c(0,22,88,44,5,5,2,6,10,7,7,7),byrow=TRUE,ncol=3)
write.table(m1,"m.1",row.names=FALSE)
write.table(m2,"m.2",row.names=FALSE)
# sort on column 2 and check result
filesort(cls,"m",2,"msort",infiledst=TRUE,ndigs=1,nsamp=3,header=TRUE)
clusterEvalQ(cls,msort)  # data should be sorted on V2
# check by comparing to input
m1
m2
m <- rbind(m1,m2)
write.table(m,"m",row.names=FALSE)
clusterEvalQ(cls,rm(msort))
filesort(cls,"m",2,"msort",infiledst=FALSE,nsamp=3,header=TRUE)
clusterEvalQ(cls,msort)  # data should be sorted on V2

# example of readnscramble()
co2 <- head(CO2,25) 
write.table(co2,"co2",row.names=FALSE)  # creates file 'co2'
filesplit(2,"co2",header=TRUE)  # creates files 'co2.1', 'co2.2'
readnscramble(cls,"co2",header=TRUE)  # now have distrib. d.f.
# save the scrambled version to disk
filesave(cls,'co2','co2s',1,sep=',')

# example of fileshuffle()
# make test file, 'test'
cat('a','bc','def','i','j','k',file='test',sep='\n')
filesplit(2,'test')  # creates files 'test.1','test.2'
fileshuffle('test',2,'testa')  # creates shuffled files 'testa.1','testa.2'

# example of filechunkname()
clusterEvalQ(cls,filechunkname("x",3))  # returns "x.001", "x.002"

# example of getnumdigs()
getnumdigs(156)  # should be 3

# examples of filesave() and fileread()
mtc <- mtcars
distribsplit(cls,"mtc")
# save distributed data frame to distributed file
filesave(cls,'mtc','ctm',1,',') 
# read it back in to a new distributed data frame
fileread(cls,'ctm','ctmnew',1,header=TRUE,sep=',') 
# check it
clusterEvalQ(cls,ctmnew) 
# try dfileagg() on it (not same as distribagg())
dfileagg(cls,c('ctm.1','ctm.2'),c("mpg","disp","hp"),c("cyl","gear"),header=TRUE,sep=",","max")
# check
aggregate(cbind(mpg,disp,hp) ~ cyl+gear,data=mtcars,FUN=max)
# extract the records with 4 cylinders and 4 gears (again, different
# from distribgetrows())
cmd <- 'tmpdata[tmpdata$cyl == 4 & tmpdata$gear == 4,]'
dfilegetrows(cls,c('ctm.1','ctm.2'),cmd,header=TRUE,sep=',') 
# check
mtc[mtc$cyl == 4 & mtc$gear == 4,]

stopCluster(cls)



[Package partools version 1.1.6 Index]