Requirement: Compute revenue generated by all postpaid and prepaid service customers. Schema details: CustomerID, Mobile Number, Gender, SeniorCitizen Flag, Mode, Calls, SMS, Internet Service Status, MonthlyCharges (USD), CustomerChurn Flag Before solving the requirement, let us understand the concept of Paired RDD. In most analytical programs, data in the <Key, Value> format provides a feasible way to perform computations. RDD with this data format is called Paired RDD. Every record contains only two fields, key and value. Solution: Step 1: Create a paired RDD with the <key, value> structure. In this requirement, it is <Mode, MonthlyCharges>. Step 2: Group all the fields and apply the sum arithmetic function Step 3: Use the same paired RDD we created and apply the reduceByKey() transformation.In [1]:from pyspark import SparkContext sc = SparkContext.getOrCreate()23/02/02 15:05:38 WARN Utils: Your hostname, ashish-Lenovo-ideapad-130-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.187 instead (on interface wlp2s0) 23/02/02 15:05:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another addressSetting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).23/02/02 15:05:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable1: Using RDD Transformations¶
pyspark.RDD.groupByKey
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Notes If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.In [2]:rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.groupByKey().mapValues(len).collect())Out[2]:[('a', 2), ('b', 1)]In [3]:sorted(rdd.groupByKey().mapValues(list).collect())Out[3]:[('a', [1, 1]), ('b', [1])]In [4]:Telecomdata = sc.textFile("../in/TelecomData.csv") PairedData = Telecomdata.map(lambda record:(record.split(",")[4],int(record.split(",")[8])))In [5]:#Apply groupByKey() transformation on paired RDD grpRecords = PairedData.groupByKey() #Use map transformation to sum the grouped values of the same key. aggregatedData = grpRecords.map(lambda x:(x[0],sum(x[1])))In [7]:aggregatedData.collect()Out[7]:[('PostPaid', 8638), ('PrePaid', 10364)]Methods used above: map() and groupByKey()¶
map() Narrow transformation used to create a new RDD from the parent RDD with the required structure and fields Allows fetching required fields from parent RDD Used to create Paired RDDs groupByKey() A wide transformation used to group values of the same key. It should be used only on Paired RDDs. Results into new RDD with the data format as <Key, List of values> Note: The above solution map() transformation is used to add all grouped values. The sum is the Python function to perform the addition operation. groupByKey() operation performs huge amounts of data shuffling, leading to network congestion and performance issues.Let us solve the requirement using an alternate solution with reduceByKey() reduceByKey() Wide transformation performs aggregation operations on the same key values. It should be used only on Paired RDDs. Results into new RDD with the data format as <Key, aggregated value> Launches combiners to perform local aggregations before final reducing. The amount of data shuffling reduces drastically and improves the performanceIn [8]:aggregatedData = PairedData.reduceByKey(lambda a, b: a + b)In [10]:aggregatedData.collect()Out[10]:[('PostPaid', 8638), ('PrePaid', 10364)]2: Using Spark SQL DataFrame¶
In [11]:from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()In [17]:df = spark.read.option("header", False).csv('../in/TelecomData.csv') df = df.withColumnRenamed("_c0", "CustomerID") \ .withColumnRenamed("_c1", "Mobile Number") \ .withColumnRenamed("_c2", "Gender") \ .withColumnRenamed("_c3", "SeniorCitizen") \ .withColumnRenamed("_c4", "Mode") \ .withColumnRenamed("_c5", "Calls") \ .withColumnRenamed("_c6", "SMS") \ .withColumnRenamed("_c7", "InternetServiceStatus") \ .withColumnRenamed("_c8", "MonthlyCharges") \ .withColumnRenamed("_c9", "CustomerChurn")In [18]:df.show(5)+-----------+-------------+------+-------------+--------+------+------+---------------------+--------------+-------------+ | CustomerID|Mobile Number|Gender|SeniorCitizen| Mode| Calls| SMS|InternetServiceStatus|MonthlyCharges|CustomerChurn| +-----------+-------------+------+-------------+--------+------+------+---------------------+--------------+-------------+ |TXCUST00001| 982120000| Male| N| PrePaid|Active|Active| InActive| 20| N| |TXCUST00002| 982120001| Male| N|PostPaid|Active|Active| InActive| 25| N| |TXCUST00003| 982120002| Male| N| PrePaid|Active|Active| InActive| 20| Y| |TXCUST00004| 982120003| Male| Y| PrePaid|Active|Active| InActive| 25| N| |TXCUST00005| 982120004| Male| N| PrePaid|Active|Active| InActive| 15| N| +-----------+-------------+------+-------------+--------+------+------+---------------------+--------------+-------------+ only showing top 5 rowspyspark.sql.DataFrame.groupBy
Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions. groupby() is an alias for groupBy(). Ex: df.groupBy().avg().collect() [Row(avg(age)=3.5)] sorted(df.groupBy('name').agg({'age': 'mean'}).collect()) [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)] sorted(df.groupBy(df.name).avg().collect()) [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]Reference
1. SQL Query - GroupBy 2. pyspark.sql.DataFrame.groupByNote
MonthlyCharges" column as read from the text file is read as String (on which we cannot perform aggregation). >>> df.groupBy('Mode').sum('MonthlyCharges').collect() AnalysisException: "MonthlyCharges" is not a numeric column. Aggregation function can only be applied on a numeric column.In [23]:from pyspark.sql.functions import col df = df.withColumn("MonthlyCharges", col("MonthlyCharges").cast("int"))In [25]:df.groupBy('Mode').sum('MonthlyCharges').show()+--------+-------------------+ | Mode|sum(MonthlyCharges)| +--------+-------------------+ |PostPaid| 8638| | PrePaid| 10364| +--------+-------------------+3: Using PySpark Pandas¶
In [26]:from pyspark import pandas as ppd df_pandas = ppd.read_csv('../in/TelecomData.csv', names = ['CustomerID', 'MobileNumber', 'Gender', 'SeniorCitizen', 'Mode', 'Calls', 'SMS', 'InternetServiceStatus', 'MonthlyCharges', 'CustomerChurn'])WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched. /home/ashish/anaconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: If `index_col` is not specified for `read_csv`, the default index is attached which can cause additional overhead. warnings.warn(message, PandasAPIOnSparkAdviceWarning)In [29]:df_pandas.groupby('Mode')['MonthlyCharges'].sum()Out[29]:Mode PostPaid 8638 PrePaid 10364 Name: MonthlyCharges, dtype: int64In [ ]:
Thursday, February 2, 2023
Grouping Data using RDD, SQL DataFrame and PySpark.Pandas
Download Code and Data