以统计词频演示spark利用先局部聚合再全局聚合解决数据倾斜的例子.

import org.apache.spark.sql.{DataFrame, SparkSession}

object DataSkewDemo1 extends App {
  private val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
  //创建list,其中a较多,处于同一个分区,下文加随机数字打乱重新分区
  private val list = List("a a a a a a b c b b c e f f", "s g b a a a a a a a a a a")

  import session.implicits._
	//list窜DF对象
  private val df: DataFrame = list.toDF("line")
  //创建临时视图
  df.createTempView("tmp")
  //分组统计每个单词的数量,
  println("未进行数据倾斜优化")
  //统计词频(例题空格拆分,之后利用explode展开,之后再统计)
  session.sql(
    """
      |select word,count(1)
      |from
      |(select
      |explode(split(line," ")) word
      |from tmp) t
      |group by t.word
      |""".stripMargin).show()

  println("在word前添加随机数字")
  //在单词前添加随机数字 0 1 2,rand生成0-1的随机小数,,floor向下取整
  //添加随机数字的单词起别名prefix_word 方便后续操作
  val randsql =
    """
      |select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
      |from
      |(select
      |explode(split(line," ")) word
      |from tmp) t
      |""".stripMargin
  session.sql(randsql).show()
  //局部聚合,将同一key加上前缀产生多个不同的key,然后相同前缀的key进行局部聚合
  //对prefix_word进行聚合计数
  println("统计局部聚合结果")
  val partSql =
    """
      |select t1.prefix_word,count(1) prefix_count
      |from
      |(select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
      |from
      |(select
      |explode(split(line," ")) word
      |from tmp) t
      |) t1
      |group by t1.prefix_word
      |""".stripMargin
  session.sql(partSql).show()

  //去掉前缀完成全局聚合
  //substring截取字符串,3个参数,待处理的字符串,截取启示位置,截取长度
  //instr() 返回prefix_word中第一次出现_的位置
  //substring(prefix_word,instr(prefix_word,"_")+1)整体的作业是去掉之前加的随机数字
  println("全局聚合结果")
  val fullsql =
    """
      |select substring(prefix_word,instr(prefix_word,"_")+1) f_word,sum(prefix_count) sum
      |from
      |(select t1.prefix_word,count(1) prefix_count
      |from
      |(select t.word,concat(floor(rand()*3),"_",t.word) prefix_word
      |from
      |(select
      |explode(split(line," ")) word
      |from tmp) t
      |) t1
      |group by t1.prefix_word) t2
      |group by f_word
      |""".stripMargin
  session.sql(fullsql).show()
}

运行结果

未进行数据倾斜优化
                                                                                +----+--------+
|word|count(1)|
+----+--------+
|   g|       1|
|   f|       2|
|   e|       1|
|   c|       2|
|   b|       4|
|   a|      16|
|   s|       1|
+----+--------+

在word前添加随机数字
+----+-----------+
|word|prefix_word|
+----+-----------+
|   a|        0_a|
|   a|        2_a|
|   a|        0_a|
|   a|        2_a|
|   a|        0_a|
|   a|        2_a|
|   b|        0_b|
|   c|        2_c|
|   b|        1_b|
|   b|        1_b|
|   c|        0_c|
|   e|        2_e|
|   f|        0_f|
|   f|        0_f|
|   s|        2_s|
|   g|        2_g|
|   b|        2_b|
|   a|        0_a|
|   a|        0_a|
|   a|        0_a|
+----+-----------+
only showing top 20 rows

统计局部聚合结果
                                                                                +-----------+------------+
|prefix_word|prefix_count|
+-----------+------------+
|        1_s|           1|
|        0_c|           1|
|        1_a|           7|
|        0_b|           4|
|        0_g|           1|
|        2_a|           2|
|        2_e|           1|
|        0_f|           1|
|        0_a|           7|
|        1_c|           1|
|        1_f|           1|
+-----------+------------+

全局聚合结果
                                                                                +------+---+
|f_word|sum|
+------+---+
|     g|  1|
|     f|  2|
|     e|  1|
|     c|  2|
|     b|  4|
|     a| 16|
|     s|  1|
+------+---+

总结

  • 如果某个分区内容特别多,可以加随机数字打乱,之后局部聚合,再全局聚合
  • 添加的随机数字再最终聚合时要去掉

附注:上例中用到的sql函数的用法

instr用法
instr从1开始计数

SELECT INSTR("0_a","_")
2

substring用法
substring从1开始计数,由上例可知,INSTR("0_a","_")结果为2. substring从第2个字符开始截取到最后(包括第二个字符)

SELECT SUBSTRING("0_a",INSTR("0_a","_"))
_a

rand用法

SELECT RAND()
0.7521917799042461
SELECT RAND()*3
1.9326316322503907
SELECT FLOOR(RAND()*3)
2

split用法
第一个参数待拆分字符,第二个参数拆分依据

hive> SELECT split("a b c d"," ");
["a","b","c","d"]

explode用法

hive> SELECT explode(split("a b c d"," "));
OK
a
b
c
d

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐