Spark - 由 foreach 引发的思考

废话不说,先贴代码

1
2
3
4
5
val numbers = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
val map = scala.collection.mutable.Map[Int, Int]()

numbers.foreach(l => {map.put(l,l)})
println(map.size) // 此时 map 的存储了几个键值对

首先我们先说个概念 —— 闭包

闭包是 Scala 中的特性,用通俗易懂的话讲就是函数内部的运算或者说函数返回值可由外部的变量所控制,用个例子解释就是:

1
2
3
4
5
6
7
var factor = 10
// multiplier 函数的返回值有有两个决定因素,输入参数变量 i 以及外部变量 factor。输入参数变量 i 是由我们调用该函数时决定的,相较于 factor 是可控的,而 factor 则是外部变量所定义,相较于 i 是不可控的
val multiplier = (i: Int) => i * factor
println(multiplier(1)) // 10

factor = 20
println(multiplier(1)) // 20

根据上述提及的闭包可知,刚才所写的代码中l => {map.put(1,1)}其所定义的函数就是一个闭包


既然标题中提到了 Spark,那就要说明闭包与 Spark 的关系了

在 Spark 中,用户自定义闭包函数并传递给相应的 RDD 所定义好的方法(如foreachmap)。Spark 在运行作业时会检查 DAG 中每个 RDD 所涉及的闭包,如是否可序列化、是否引用外部变量等。若存在引用外部变量的情况,则会将它们的副本复制到相应的工作节点上,保证程序运行的一致性

下面是 Spark 文档中解释的:

Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

共享变量

通常情况下,当有函数传递给在远端集群节点上执行的 Spark 的算子(如mapreduce)时,Spark 会将所有在该函数内部所需要的用到的变量分别复制到相应的节点上。这些副本变量会被复制到每个节点上,且在算子执行结束后这些变量并不会回传给驱动程序(driver program)。

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.


总结,如果直接运行一开始所提及的程序时,那么所获得的答案是0,因为我们知道map变量会被拷贝多份至不同的工作节点上,而我们操作的也仅仅只是副本罢了

从编译器的角度来说,这段代码是一个闭包函数,而其调用了外部变量,代码上没问题。但是从运行结果中,这是错误操作方式,因为 Spark 会将其所调用的外部变量进行拷贝,并复制到相应的工作节点中,而不会对真正的变量产生任何影响

相应的解决方案有

1
2
3
4
5
val numbers = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
val map = scala.collection.mutable.Map[Int, Int]()

numbers.collect().foreach(l => {map.put(l,l)})
println(map.size)

参考资料:

  1. http://spark.apache.org/docs/2.1.0/programming-guide.html#shared-variables