Spark sql数据倾斜优化的一个演示案例
以统计词频演示spark利用先局部聚合再全局聚合解决数据倾斜的例子.import org.apache.spark.sql.{DataFrame, SparkSession}object DataSkewDemo1 extends App {private val session: SparkSession = SparkSession.builder().appName("test").mast
·
以统计词频演示spark利用先局部聚合再全局聚合解决数据倾斜的例子.
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataSkewDemo1 extends App {
private val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
//创建list,其中a较多,处于同一个分区,下文加随机数字打乱重新分区
private val list = List("a a a a a a b c b b c e f f", "s g b a a a a a a a a a a")
import session.implicits._
//list窜DF对象
private val df: DataFrame = list.toDF("line")
//创建临时视图
df.createTempView("tmp")
//分组统计每个单词的数量,
println("未进行数据倾斜优化")
//统计词频(例题空格拆分,之后利用explode展开,之后再统计)
session.sql(
"""
|select word,count(1)
|from
|(select
|explode(split(line," ")) word
|from tmp) t
|group by t.word
|""".stripMargin).show()
println("在word前添加随机数字")
//在单词前添加随机数字 0 1 2,rand生成0-1的随机小数,,floor向下取整
//添加随机数字的单词起别名prefix_word 方便后续操作
val randsql =
"""
|select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
|from
|(select
|explode(split(line," ")) word
|from tmp) t
|""".stripMargin
session.sql(randsql).show()
//局部聚合,将同一key加上前缀产生多个不同的key,然后相同前缀的key进行局部聚合
//对prefix_word进行聚合计数
println("统计局部聚合结果")
val partSql =
"""
|select t1.prefix_word,count(1) prefix_count
|from
|(select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
|from
|(select
|explode(split(line," ")) word
|from tmp) t
|) t1
|group by t1.prefix_word
|""".stripMargin
session.sql(partSql).show()
//去掉前缀完成全局聚合
//substring截取字符串,3个参数,待处理的字符串,截取启示位置,截取长度
//instr() 返回prefix_word中第一次出现_的位置
//substring(prefix_word,instr(prefix_word,"_")+1)整体的作业是去掉之前加的随机数字
println("全局聚合结果")
val fullsql =
"""
|select substring(prefix_word,instr(prefix_word,"_")+1) f_word,sum(prefix_count) sum
|from
|(select t1.prefix_word,count(1) prefix_count
|from
|(select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
|from
|(select
|explode(split(line," ")) word
|from tmp) t
|) t1
|group by t1.prefix_word) t2
|group by f_word
|""".stripMargin
session.sql(fullsql).show()
}
运行结果
未进行数据倾斜优化
+----+--------+
|word|count(1)|
+----+--------+
| g| 1|
| f| 2|
| e| 1|
| c| 2|
| b| 4|
| a| 16|
| s| 1|
+----+--------+
在word前添加随机数字
+----+-----------+
|word|prefix_word|
+----+-----------+
| a| 0_a|
| a| 2_a|
| a| 0_a|
| a| 2_a|
| a| 0_a|
| a| 2_a|
| b| 0_b|
| c| 2_c|
| b| 1_b|
| b| 1_b|
| c| 0_c|
| e| 2_e|
| f| 0_f|
| f| 0_f|
| s| 2_s|
| g| 2_g|
| b| 2_b|
| a| 0_a|
| a| 0_a|
| a| 0_a|
+----+-----------+
only showing top 20 rows
统计局部聚合结果
+-----------+------------+
|prefix_word|prefix_count|
+-----------+------------+
| 1_s| 1|
| 0_c| 1|
| 1_a| 7|
| 0_b| 4|
| 0_g| 1|
| 2_a| 2|
| 2_e| 1|
| 0_f| 1|
| 0_a| 7|
| 1_c| 1|
| 1_f| 1|
+-----------+------------+
全局聚合结果
+------+---+
|f_word|sum|
+------+---+
| g| 1|
| f| 2|
| e| 1|
| c| 2|
| b| 4|
| a| 16|
| s| 1|
+------+---+
总结
- 如果某个分区内容特别多,可以加随机数字打乱,之后局部聚合,再全局聚合
- 添加的随机数字再最终聚合时要去掉
附注:上例中用到的sql函数的用法
instr用法
instr从1开始计数
SELECT INSTR("0_a","_")
2
substring用法
substring从1开始计数,由上例可知,INSTR("0_a","_")
结果为2. substring从第2个字符开始截取到最后(包括第二个字符)
SELECT SUBSTRING("0_a",INSTR("0_a","_"))
_a
rand用法
SELECT RAND()
0.7521917799042461
SELECT RAND()*3
1.9326316322503907
SELECT FLOOR(RAND()*3)
2
split用法
第一个参数待拆分字符,第二个参数拆分依据
hive> SELECT split("a b c d"," ");
["a","b","c","d"]
explode用法
hive> SELECT explode(split("a b c d"," "));
OK
a
b
c
d

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)