Demostrating PySpark's mapPartitions() using Pi calculation and Avg calculation


Here is the original Pi code coming with PySpark installation:

from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

~ * ~

Here is same code written using the mapPartitions():

from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        temp_list = []
        for i in _:
            x = random() * 2 - 1
            y = random() * 2 - 1
            temp_list.append(1 if x ** 2 + y ** 2 <= 1 else 0)
        return [sum(temp_list)]

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).mapPartitions(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

~ * ~

From book: Learning Spark (Andy Konwinski, O' Reilly, 2015)

Example 6-13. Average without mapPartitions() in Python
    
    def combineCtrs(c1, c2):
        return (c1[0] + c2[0], c1[1] + c2[1])
    
    def basicAvg(nums):
        """Compute the average"""
        nums   .map(lambda num: (num, 1)).reduce(combineCtrs)
    
Example 6-14. Average with mapPartitions() in Python
    
    def partitionCtr(nums):
        """Compute sumCounter for partition"""
        sumCount = [0, 0]
        for num in nums:
            sumCount[0] += num
            sumCount[1] += 1
        return [sumCount]
    
    def fastAvg(nums):
        """Compute the avg"""
        sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
        return sumCount[0] / float(sumCount[1])

No comments:

Post a Comment