Spark

需求:使用spark统计词频,并求出现次数最多的10个词以及出现次数
问题:用Spark算子top(),求top N的时候结果不准确

我们用一首被初中生唱收费的《That girl》来做测试:
数据格式

步骤一:使用算子map() 、reduceByKey()、filter()统计词频

1
2
3
4
5
6
7
8
9
10
11
12
13
def wordcount(): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val rdd1 = sc.textFile("song.txt")
val sortWord = rdd1.flatMap(_.split(" "))
.map(x => (x,1))
.reduceByKey((v1,v2) => v1 + v2)
.filter(x => x._1 != "")
.foreach(println)
sc.stop()
}

输出:
输出结果

步骤二:根据词频倒序排序

注意:sortBy(x => x._2,false,1),需要设置分区数为1,不然会在不同的分区内排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def wordcount(): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val rdd1 = sc.textFile("song.txt")
val sortWord = rdd1.flatMap(_.split(" "))
.map(x => (x,1))
.reduceByKey((v1,v2) => v1 + v2)
.filter(x => x._1 != "")
.sortBy(x => x._2,false,1)
.foreach(println)
sc.stop()
}

输出:
输出结果

步骤三:使用top()算子求top 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def wordcount(): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val rdd1 = sc.textFile("song.txt")
val sortWord = rdd1.flatMap(_.split(" "))
.map(x => (x,1))
.reduceByKey((v1,v2) => v1 + v2)
.filter(x => x._1 != "")
.sortBy(x => x._2,false,1)
.top(2)
.foreach(println)
sc.stop()
}

输出:
输出结果

注意: 这里问题来了,对比一下第二步和第三步的输出发现,得到的top 10结果并不是倒序排序后得到的前十个词!!!
我们来看看top()函数的源码:

1
2
3
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
takeOrdered(num)(ord.reverse)
}

top()调用takeOrdered()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}

问题分析:

top()算子底层调用了 takeOrdered()这个函数,这个函数也是RDD中的一个算子,来看看上边的源码:

首先takeOrdered()里调用了 mapPartitions();
其次top()会对我之前sortBy()的结果按照key重新排序,所以导致了我们Top N的结果不准确;

解决方案:

方案一:指定top()的排序方法,这里我们直接根据value排序:sortBy(x => x._2,false).top(10)(Ordering.by(e => e._2)
方案二:不用top(),直接用sortBy(x =>x._2,false).take(10)
方案三:既然top()底层调用的是takeOrdered(),我们也直接可以用takeOrdered(10)(Ordering.by(e => e._2)
*思考:方案一中,我们既然指定了top()的排序方式,还需要sortBy()嘛???当然可以不用啊!!!所以我们可以去掉sortBy()
*科普一下:top(10)(Ordering.by(e => e._2) 这种写法叫做函数的柯里化。
*柯里化(Currying):把接受多个参数的函数变换成接受一个单一参数(最初函数的第一个参数)的函数,并且返回接受余下的参数且返回结果的新函数的技术。这个技术由 Christopher Strachey 以逻辑学家 Haskell Curry 命名的,尽管它是 Moses Schnfinkel 和 Gottlob Frege 发明的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用top():
def wordcount(): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val rdd1 = sc.textFile("song.txt")
val sortWord = rdd1.flatMap(_.split(" "))
.map(x => (x,1))
.reduceByKey((v1,v2) => v1 + v2)
.filter(x => x._1 != "")
//.sortBy(x => x._2,false,1)
.top(10)(Ordering.by(x => x._2))
.foreach(println)
sc.stop()
}

# 使用take()
def wordcount(): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

val rdd1 = sc.textFile("song.txt")
val sortWord = rdd1.flatMap(_.split(" "))
.map(x => (x,1))
.reduceByKey((v1,v2) => v1 + v2)
.filter(x => x._1 != "")
.sortBy(x =>x._2,false).take(10)
.foreach(println)
sc.stop()
}

两者结果都是一样的:
输出结果

顺便放上take()的源码,感兴趣的可以看一看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}

val left = num - buf.size
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}

buf.toArray
}
}

小结:我在CSDN上的第一篇博客,望各界大佬多多指教!

× I am Groot!
打赏二维码