一,基础应用
(1)创建createDataFrame
package com.hollysys.spark
import java.util
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
object CreateDataFrameTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName(this.getClass.getSimpleName).master("local")
.getOrCreate()
//通过Seq生成
val df = spark.createDataFrame(Seq(
("ming", 20, 15552211521L),
("hong", 19, 13287994007L),
("zhi", 21, 15552211523L)
)) toDF("name", "age", "phone")
df.show()
//动态创建schema
val schema = StructType(List(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("phone", LongType, true)
))
val dataList = new util.ArrayList[Row]()
dataList.add(Row("ming",20,15552211521L))
dataList.add(Row("hong",19,13287994007L))
dataList.add(Row("zhi",21,15552211523L))
spark.createDataFrame(dataList,schema).show()
}
}
运行效果:

如果对此感兴趣,请查询链接:Spark创建DataFrame的几种方式_spark.createdataframe-CSDN博客
(2) select和selectExpr方法
//selece方法 import org.apache.spark.sql.SparkSession object select { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() //创建Spark会话 .appName("Spark SQL basic example") //设置会话名称 .master("local") //设置本地模式 .getOrCreate() //创建会话变量 val rdd = spark.sparkContext.parallelize(Array(1,2,3,4)) import spark.implicits._ val df = rdd.toDF("id") df.select("id").show() //选择“id”列 } }//selectExpr方法
import org.apache.spark.sql.SparkSession object select { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() //创建Spark会话 .appName("Spark SQL basic example") //设置会话名称 .master("local") //设置本地模式 .getOrCreate() //创建会话变量 val rdd = spark.sparkContext.parallelize(Array(1,2,3,4)) import spark.implicits._ val df = rdd.toDF("id") df.selectExpr("id as ID").show() //设置了一个别名ID } }
分开分别运行后,查看运行结果:
select方法 selectExpr方法

(3)collect方法
import org.apache.spark.sql.SparkSession
object collect {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder() //创建Spark会话
.appName("Spark SQL basic example") //设置会话名称
.master("local") //设置本地模式
.getOrCreate() //创建会话变量
val rdd = spark.sparkContext.parallelize(Array(1,2,3,4))
import spark.implicits._
val df = rdd.toDF("id")
val arr = df.collect()
println(arr.mkString("Array(", ", ", ")"))
}
}
运行结果:
(4)DataFrame计算行数count方法
import org.apache.spark.sql.SparkSession
object count {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder() //创建Spark会话
.appName("Spark SQL basic example") //设置会话名称
.master("local") //设置本地模式
.getOrCreate() //创建会话变量
val rdd = spark.sparkContext.parallelize(Array(1,2,3,4))
import spark.implicits._
val df = rdd.toDF("id")
println(df.count()) //计算行数
}
}
运行结果

(5)过滤数据的filter方法
import org.apache.spark.sql.SparkSession
object fliter {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder() //创建Spark会话
.appName("Spark SQL basic example") //设置会话名称
.master("local") //设置本地模式
.getOrCreate() //创建会话变量
val rdd = spark.sparkContext.parallelize(Array(1,2,3,4))
import spark.implicits._
val df = rdd.toDF("id")
val df2 = df.filter("id>3")//过滤id列大于3的数据(行)或 _ >= 3
println(df2.cache().show()) //打印结果
}
}
运行效果

(6)以整体数据为单位操作数据的flatMap方法
import org.apache.spark.sql.SparkSession
object flatmap {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder() //创建Spark会话
.appName("Spark SQL basic example") //设置会话名称
.master("local") //设置本地模式
.getOrCreate() //创建会话变量
val rdd = spark.sparkContext.parallelize(Seq("hello!spark", "hello!hadoop"))
import spark.implicits._
val df = rdd.toDF("id")
val x = df.flatMap(x => x.toString().split("!")).collect()
println(x.mkString("Array(", ", ", ")"))
}
}
运行效果

(7)分组数据的groupBy和agg方法
import org.apache.spark.sql.SparkSession
object GroupByExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.master("local")
.getOrCreate()
// 导入隐式转换,获取默认的编码器
import spark.implicits._
// 创建 JSON 字符串数组的 Dataset
val jsonDataSet = spark.createDataset(Array(
"{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
"{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
"{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
))
// 将 JSON 数据集转换为 DataFrame
val jsonDataSetDf = spark.read.json(jsonDataSet)
// 显示 DataFrame 的内容
jsonDataSetDf.groupBy("name").agg("age" -> "count").show()
spark.stop()
}
}
运行效果:

(8)删除数据集中某列的drop方法
import org.apache.spark.sql.SparkSession
object GroupByExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.master("local")
.getOrCreate()
// 导入隐式转换,获取默认的编码器
import spark.implicits._
// 创建 JSON 字符串数组的 Dataset
val jsonDataSet = spark.createDataset(Array(
"{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
"{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
"{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
))
// 将 JSON 数据集转换为 DataFrame
val jsonDataSetDf = spark.read.json(jsonDataSet)
// 显示 DataFrame 的内容
jsonDataSetDf.drop("age").show()
//删除age列
spark.stop()
}
}
