SparkSQL简单使用-创新互联

==> 什么是 Spark SQL?

创新互联公司专注于企业成都全网营销、网站重做改版、六枝网站定制设计、自适应品牌网站建设、H5场景定制商城网站建设、集团公司官网建设、成都外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为六枝等各大城市提供网站开发制作服务。

    ---> Spark SQL 是 Spark 用来处理结构化数据的一个模块


   ---> 作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

   --->运行原理:将 Spark SQL 转化为 RDD, 然后提交到集群执行

   ---> 特点:

        ----容易整合

      ----统一的数据访问方式

      ---- 兼容 Hive

      ---- 标准的数据连接

==> SparkSession

   ---> 特点:(2.0引用 SparkSession)

        ---- 为用户提供一个统一的切入点使用Spark 各项功能

      ---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

      ---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

      ---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

==> DataFrames  组织成命名列的数据集,等同于数据库中的表

    ---> 与 RDD 相比较:

      ---- RDD                是分布式的 Java 对象 的集合

      ---- DataFrame     是分布式 Row 对象的集合

   ---> 创建 DataFrames

        ---- 通过 case class 创建 DataFrames

// 定义 case class (相当于表的结构)
case class Emp(Empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)   

// 将 HDFS 上的数据读入 RDD, 并将 RDD 与 case class 关联
val lines = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))
val emp = lines.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)) `

// 将RDD 转换成 DataFrames
val empDF = emp.toDF

// 通过 DataFrames 查询数据
empDF.show

      ---- 通过 SparkSession 创建 DataFrames

// 创建 StructType 来定义结构,注意,需要先导入模块
import org.apache.spark.sql.types._
val myschema = StructType(List(
                StructField("empno", DataTypes.IntegerType), 
                StructField("ename", DataTypes.StringType),
                StructField("job", DataTypes.StringType),
                StructField("mgr", DataTypes.StringType),
                StructField("hiredate", DataTypes.StringType),
                StructField("sal", DataTypes.IntegerType),
                StructField("comm", DataTypes.StringType),
                StructField("deptno", DataTypes.IntegerType)
                ))
                
// 读入数据且切分数据
val empcsvRDD = sc.textFile("hdfs://bigdata0:9000/input/emp.csv").map(_.split(","))
// 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
val rowRDD = empcsvRDD.map(line=> Row(line(0).toInt, line(1), line(2), line(3),line(4), line(5).toInt, line(6), line(7).toInt)
// 创建 DataFrames
val df = spark.createDataFrame(rowRDD, myschema)
// 查看表
df.show

    ---- 使用 Json 文件来创建 DataFrame

val df = spark.read.json("Json 文件")
// 查看数据
df.show

   ---> DataFrame 操作 DataFrame 操作也称为无类型的 Dataset操作


    ---- 查询所有员工姓名

df.select("ename").show

    ---- 查询所有员工姓名和薪水,并给薪水加 100 元

df.select($"ename", $"sal", $"sal"+ 100).show

    ---- 查询工资大于2000的员工

df.select($"sal" > 2000).show

    ---- 求每个部门员工数

df.groupBy($"deptno").count.show

    ---- 在 DataFrame 中使用 SQL 语句  注: 需要首先将 DataFrame 注册成表(视图)

df.createOrReplaceTempView("emp")
// 执行查询
spark.sql("select * from emp").show

   --->临时视图(2种):

    ----只在当前会话中有效            df.createOrReplaceTempView("emp1")

    ----在全局有效                    df.createGlobalTempView("emp2")

==> Datasets

    ---> 数据的分布式集合

   --->特点:

        ----Spark1.6中添加的新接口,是DataFrame之上更高一级的抽象

      ----提供了 RDD的优点(强类型化,使用 lambda函数的能力)

      ----Spark SQL 优化后的执行引擎

      ----可以从 JVM 对象构造,然后使用函数转换(map, flatMap, filter等)去操作

      ----支持 Scala 和 Java,不支持 Python

   ---> 创建 DataSet

    ----使用序列

// 定义 case class
case class MyData(a:String, b:String)
// 生成序列并创建 DataSet
val ds = Seq(MyData(1, "Tom"), MyData(2, "Mary")).toDS
// 查看结果
ds.show

    ----使用 Json 数据

// 定义 case class 
case class Person(name:String, gender:String)
//通过 Json 数据生成 DataFrame
val df = spark.read.json(sc.parallelize("""{"gender":"Male", "name": "Tom"}""" ::Nil))
// 将 DataFrame 转成 DataSet
df.as[Person].show
df.as[Person].collect

    ----通过使用 DHFS 执行 WordCount 程序

// 读取 HDFS 数据,并创建 DataSet
val linesDS = spark.read.text("hdfs://bigdata0:9000/input/data.txt").as[String]
// 对DataSet 进行操作:分词后, 查询长度大于3 的单词
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
// 查看结果
words.show
words.collect

// 执行wordcount 程序
val result = linesDS.flatMap(_.split(" ").map((_.1)).groupByKey(x=> x._1).count)
result.show
// 排序
result.orderBy($"value").show

==> Datasets 操作

    ---> joinWith 和 join 的区别是连接后的新 Dataset 的 schema 会不一样

// 使用 emp.json 生成 DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
// 查询工资大于 3000 的员工
empDF.where($"sal" > 3000).show
// 创建 case class
case class Emp(empno:Lone, ename:String, job:String, hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long)
// 生成 DataSets,并查询数据
val empDS = empDF.as[Emp]
// 查询工资大于 3000 的员工
empDS.filter(_.sal > 3000).show
// 查看 10 号部门的员工
empDS.filter(_.deptno == 10)
// 多表查询
// 1.创建部门表
val deptRDD = sc.textFile("/test/dept.csv").map(_.split(","))
case class Dept(deptno:Int, dname:String, loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt, x(1), x(2))).toDS

// 2.创建员工表
case class Emp(empno:Int, ename:String, job:String, mgr:String, hiredate:String, sal:Int, comm:String, deptno:Int)
val empRDD = sc.textFile("/test/emp.csv").map(_.split(","))
val empDS = empRDD.map(x=> Emp(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt))

// 3.执行多表查询: 等值链接
val result = deptDF.join(empDS, "deptno")

// 另一种写法: 注意有三个等号
val result = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno"))
// 查看执行计划
result.explain

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站名称:SparkSQL简单使用-创新互联
链接URL:http://pcwzsj.com/article/dodiho.html