今天就跟大家聊聊有关Spark SQL数据加载和保存的实例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

一、前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二、Spark SQL读写数据代码实战

importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;importorg.apache.spark.sql.*;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.StructField;importorg.apache.spark.sql.types.StructType;importjava.util.ArrayList;importjava.util.List;publicclassSparkSQLLoadSaveOps{publicstaticvoidmain(String[]args){SparkConfconf=newSparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");JavaSparkContextsc=newJavaSparkContext(conf);SQLContext=newSQLContext(sc);/***read()是DataFrameReader类型,load可以将数据读取出来*/DataFramepeopleDF=sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");/***直接对DataFrame进行操作*Json:是一种自解释的格式,读取Json的时候怎么判断其是什么格式?*通过扫描整个Json。扫描之后才会知道元数据*///通过mode来指定输出文件的是append。创建新文件来追加文件peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");}}

读取过程源码分析如下: 1. read方法返回DataFrameReader,用于读取数据。

/***::Experimental::*Returnsa[[DataFrameReader]]thatcanbeusedtoreaddatainasa[[DataFrame]].*{{{*sqlContext.read.parquet("/path/to/file.parquet")*sqlContext.read.schema(schema).json("/path/to/file.json")*}}}**@groupgenericdata*@since1.4.0*/@Experimental//创建DataFrameReader实例,获得了DataFrameReader引用defread:DataFrameReader=newDataFrameReader(this)

2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。

/***Specifiestheinputdatasourceformat.**@since1.4.0*/defformat(source:String):DataFrameReader={this.source=sourcethis}

3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

/***Loadsinputinasa[[DataFrame]],fordatasourcesthatrequireapath(e.g.databackedby*alocalordistributedfilesystem).**@since1.4.0*///TODO:RemovethisoneinSpark2.0.defload(path:String):DataFrame={option("path",path).load()}

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。 下面就是写操作!!!

1. 调用DataFrame中select函数进行对列筛选

/***Selectsasetofcolumns.Thisisavariantof`select`thatcanonlyselect*existingcolumnsusingcolumnnames(i.e.cannotconstructexpressions).**{{{*//Thefollowingtwoareequivalent:*df.select("colA","colB")*df.select($"colA",$"colB")*}}}*@groupdfops*@since1.3.0*/@scala.annotation.varargsdefselect(col:String,cols:String*):DataFrame=select((col+:cols).map(Column(_)):_*)

2. 然后通过write将结果写入到外部存储系统中。

/***::Experimental::*Interfaceforsavingthecontentofthe[[DataFrame]]outintoexternalstorage.**@groupoutput*@since1.4.0*/@Experimentaldefwrite:DataFrameWriter=newDataFrameWriter(this)

3. 在保持文件的时候mode指定追加文件的方式

/***Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude://Overwrite是覆盖*-`SaveMode.Overwrite`:overwritetheexistingdata.//创建新的文件,然后追加*-`SaveMode.Append`:appendthedata.*-`SaveMode.Ignore`:ignoretheoperation(i.e.no-op).*-`SaveMode.ErrorIfExists`:defaultoption,throwanexceptionatruntime.**@since1.4.0*/defmode(saveMode:SaveMode):DataFrameWriter={this.mode=saveModethis}

4. 最后,save()方法触发action,将文件输出到指定文件中。

/***Savesthecontentofthe[[DataFrame]]atthespecifiedpath.**@since1.4.0*/defsave(path:String):Unit={this.extraOptions+=("path"->path)save()}

三、Spark SQL读写整个流程图如下

四、对于流程中部分函数源码详解

DataFrameReader.Load()

1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。

/***ReturnsthedatasetstoredatpathasaDataFrame,*usingthedefaultdatasourceconfiguredbyspark.sql.sources.default.**@groupgenericdata*@deprecatedAsof1.4.0,replacedby`read().load(path)`.ThiswillberemovedinSpark2.0.*/@deprecated("Useread.load(path).ThiswillberemovedinSpark2.0.","1.4.0")defload(path:String):DataFrame={//此时的read就是DataFrameReaderread.load(path)}

2. 追踪load源码进去,源码如下:在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

/***Loadsinputinasa[[DataFrame]],fordatasourcesthatrequireapath(e.g.databackedby*alocalordistributedfilesystem).**@since1.4.0*///TODO:RemovethisoneinSpark2.0.defload(path:String):DataFrame={option("path",path).load()}

3. 追踪load源码如下:

/***Loadsinputinasa[[DataFrame]],fordatasourcesthatdon'trequireapath(e.g.external*key-valuestores).**@since1.4.0*/defload():DataFrame={//对传入的Source进行解析valresolved=ResolvedDataSource(sqlContext,userSpecifiedSchema=userSpecifiedSchema,partitionColumns=Array.empty[String],provider=source,options=extraOptions.toMap)DataFrame(sqlContext,LogicalRelation(resolved.relation))}

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。 Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.

/***Specifiestheinputdatasourceformat.Built-inoptionsinclude“parquet”,”json”,etc.**@since1.4.0*/defformat(source:String):DataFrameReader={this.source=source//FileTypethis}

DataFrame.write()

1. 创建DataFrameWriter实例

/***::Experimental::*Interfaceforsavingthecontentofthe[[DataFrame]]outintoexternalstorage.**@groupoutput*@since1.4.0*/@Experimentaldefwrite:DataFrameWriter=newDataFrameWriter(this)1

2. 追踪DataFrameWriter源码如下:以DataFrame的方式向外部存储系统中写入数据。

/***::Experimental::*Interfaceusedtowritea[[DataFrame]]toexternalstoragesystems(e.g.filesystems,*key-valuestores,etc).Use[[DataFrame.write]]toaccessthis.**@since1.4.0*/@ExperimentalfinalclassDataFrameWriterprivate[sql](df:DataFrame){

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。 Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。

/***Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude:*-`SaveMode.Overwrite`:overwritetheexistingdata.*-`SaveMode.Append`:appendthedata.*-`SaveMode.Ignore`:ignoretheoperation(i.e.no-op).//默认操作*-`SaveMode.ErrorIfExists`:defaultoption,throwanexceptionatruntime.**@since1.4.0*/defmode(saveMode:SaveMode):DataFrameWriter={this.mode=saveModethis}

2. 通过模式匹配接收外部参数

/***Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude:*-`overwrite`:overwritetheexistingdata.*-`append`:appendthedata.*-`ignore`:ignoretheoperation(i.e.no-op).*-`error`:defaultoption,throwanexceptionatruntime.**@since1.4.0*/defmode(saveMode:String):DataFrameWriter={this.mode=saveMode.toLowerCasematch{case"overwrite"=>SaveMode.Overwritecase"append"=>SaveMode.Appendcase"ignore"=>SaveMode.Ignorecase"error"|"default"=>SaveMode.ErrorIfExistscase_=>thrownewIllegalArgumentException(s"Unknownsavemode:$saveMode."+"Acceptedmodesare'overwrite','append','ignore','error'.")}this}

DataFrameWriter.save()

1. save将结果保存传入的路径。

/***Savesthecontentofthe[[DataFrame]]atthespecifiedpath.**@since1.4.0*/defsave(path:String):Unit={this.extraOptions+=("path"->path)save()}

2. 追踪save方法。

/***Savesthecontentofthe[[DataFrame]]asthespecifiedtable.**@since1.4.0*/defsave():Unit={ResolvedDataSource(df.sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),mode,extraOptions.toMap,df)}

3. 其中source是SQLConf的defaultDataSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

//ThisisusedtosetthedefaultdatasourcevalDEFAULT_DATA_SOURCE_NAME=stringConf("spark.sql.sources.default",defaultValue=Some("org.apache.spark.sql.parquet"),doc="Thedefaultdatasourcetouseininput/output.")

DataFrame.scala中部分函数详解:

1. toDF函数是将RDD转换成DataFrame

/***Returnstheobjectitself.*@groupbasic*@since1.3.0*///ThisisdeclaredwithparenthesestopreventtheScalacompilerfromtreating//`rdd.toDF("1")`asinvokingthistoDFandthenapplyonthereturnedDataFrame.deftoDF():DataFrame=this

2. show()方法:将结果显示出来

/***Displaysthe[[DataFrame]]inatabularform.Forexample:*{{{*yearmonthAVG('AdjClose)MAX('AdjClose)*1980120.5032180.595103*1981010.5232890.570307*1982020.4365040.475256*1983030.4105160.442194*1984040.4500900.483521*}}}*@paramnumRowsNumberofrowstoshow*@paramtruncateWhethertruncatelongstrings.Iftrue,stringsmorethan20characterswill*betruncatedandallcellswillbealignedright**@groupaction*@since1.5.0*///scalastyle:offprintlndefshow(numRows:Int,truncate:Boolean):Unit=println(showString(numRows,truncate))//scalastyle:onprintln

追踪showString源码如下:showString中触发action收集数据。

/***Composethestringrepresentingrowsforoutput*@param_numRowsNumberofrowstoshow*@paramtruncateWhethertruncatelongstringsandaligncellsright*/private[sql]defshowString(_numRows:Int,truncate:Boolean=true):String={valnumRows=_numRows.max(0)valsb=newStringBuildervaltakeResult=take(numRows+1)valhasMoreData=takeResult.length>numRowsvaldata=takeResult.take(numRows)valnumCols=schema.fieldNames.length

看完上述内容,你们对Spark SQL数据加载和保存的实例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。