/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit. * * @note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory. * 此方法被使用时期望目标数组的大小比较小,即其数组中所有数据都能够存储在 driver 的内存当中。这里的函数解释当中提及到了处理的数据量应当较小,但是没说如果处理了比较大的数据时会怎么样,还得看看继续往下看 * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ deftake(num: Int): Array[T] = withScope { // scaleUpFactor 字面意思是扩增因子,看到这里我们可以结合上图的例子,不难看出分区的扩增是按照一定的倍数增长的 val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { newArray[T](0) } else { val buf = newArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 // 这个循环是为什么 take 失败会进行重试的关键 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. // numPartsToTry - 此次循环迭代的分区个数,默认为1。 var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. // 重点!当在上一次迭代当中,我们没有找到任何满足条件的 row 时(至少是不满足指定数量时),有规律的重试(quadruple and retry,翻译水平有限) if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // As left > 0, numPartsToTry is always >= 1 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } }
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)