用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:

classMySequenceFileInputFormat[K,V]extendsFileInputFormat[K,V]{privatevalLOG:Log=LogFactory.getLog(classOf[MySequenceFileInputFormat[K,V]])valsequenceFileBlockSize=30000000//手动设置blocksize为30MvalSPLIT_SLOP:Double=1.1;//10%slopvalNUM_INPUT_FILES:String="mapreduce.input.fileinputformat.numinputfiles";@throws[IOException]defcreateRecordReader(split:InputSplit,context:TaskAttemptContext):RecordReader[K,V]=newSequenceFileRecordReaderoverrideprotecteddefgetFormatMinSplitSize:Long=2000L@throws[IOException]overrideprotecteddeflistStatus(job:JobContext):List[FileStatus]={valfiles:List[FileStatus]=super.listStatus(job)vallen:Int=files.sizevarj:Int=0for(i<-0tolen-1){valf=files.get(i)if(f.isDirectory){valpth:Path=f.getPathvalfs:FileSystem=pth.getFileSystem(job.getConfiguration)files.set(i,fs.getFileStatus(newPath(pth,"data")))}if((files.get(i)).getLen()!=0L){files.set(j,files.get(i))j+=1}}files.subList(0,j)}@throws[IOException]overridedefgetSplits(job:JobContext):List[InputSplit]={valsw:Stopwatch=newStopwatch().start();valminSize:Long=Math.max(getFormatMinSplitSize(),FileInputFormat.getMinSplitSize(job));valmaxSize:Long=FileInputFormat.getMaxSplitSize(job);//generatesplitsvalsplits:ArrayList[InputSplit]=newArrayList[InputSplit]valfiles:List[FileStatus]=listStatus(job)for(i<-0tofiles.size()-1){valfile=files.get(i)valpath:Path=file.getPath();vallength:Long=file.getLen();if(length!=0){varblkLocations:Array[BlockLocation]=nullif(file.isInstanceOf[LocatedFileStatus]){blkLocations=(file.asInstanceOf[LocatedFileStatus]).getBlockLocations()}else{valfs:FileSystem=path.getFileSystem(job.getConfiguration())blkLocations=fs.getFileBlockLocations(file,0,length)}if(isSplitable(job,path)){//valblockSize:Long=file.getBlockSize()valblockSize:Long=sequenceFileBlockSizevalsplitSize:Long=computeSplitSize(blockSize,minSize,maxSize)varbytesRemaining:Long=length;while((bytesRemaining.toDouble)/splitSize>SPLIT_SLOP){valblkIndex:Int=getBlockIndex(blkLocations,length-bytesRemaining)splits.add(makeSplit(path,length-bytesRemaining,splitSize,blkLocations(blkIndex).getHosts(),blkLocations(blkIndex).getCachedHosts()))bytesRemaining-=splitSize}if(bytesRemaining!=0){valblkIndex:Int=getBlockIndex(blkLocations,length-bytesRemaining)splits.add(makeSplit(path,length-bytesRemaining,bytesRemaining,blkLocations(blkIndex).getHosts(),blkLocations(blkIndex).getCachedHosts()));}}else{//notsplitablesplits.add(makeSplit(path,0,length,blkLocations(0).getHosts(),blkLocations(0).getCachedHosts()));}}else{//Createemptyhostsarrayforzerolengthfilessplits.add(makeSplit(path,0,length,newArray[String](0)))}}//Savethenumberofinputfilesformetrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES,files.size())sw.stop();if(LOG.isDebugEnabled()){LOG.debug("Total#ofsplitsgeneratedbygetSplits:"+splits.size()+",TimeTaken:"+sw.elapsedMillis())}returnsplits}}


sequenceFileBlockSize改成自己想要的大小


使用:

valdd=sc.newAPIHadoopFile[BytesWritable,BytesWritable,MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{function(newString(x._2.getBytes))})