Requirement 1: List all Senior citizens who have moved out (Churn = Y) of a Telecom service provider. Schema details: CustomerID, Mobile Number, Gender, SeniorCitizen Flag, Mode, Calls, SMS, Internet Service Status, MonthlyCharges (USD), CustomerChurn Flag
In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
23/02/01 22:47:33 WARN Utils: Your hostname, ashish-Lenovo-ideapad-130-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.187 instead (on interface wlp2s0) 23/02/01 22:47:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/01 22:47:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/02/01 22:47:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
1. Using RDD¶
In [2]:
ChurnRDD = sc.textFile("../in/TelecomData.csv")
In [3]:
#apply filter transformation
FilteredRDD = ChurnRDD.filter(lambda record: record.split(",")[3] == "Y" and record.split(",")[9] == "Y")
In [6]:
FilteredRDD.take(5)
Out[6]:
['TXCUST00009,982120008,Male,Y,PrePaid,Active,Active,InActive,15,Y', 'TXCUST00186,982120185,Male,Y,PrePaid,Active,Active,Active,80,Y', 'TXCUST00187,982120186,Male,Y,PrePaid,Active,Active,Active,90,Y', 'TXCUST00188,982120187,Male,Y,PrePaid,Active,Active,Active,100,Y', 'TXCUST00189,982120188,Male,Y,PrePaid,Active,Active,Active,65,Y']
2. Using SQL DataFrame¶
In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
In [20]:
df = spark.read.option("header", False).csv('../in/TelecomData.csv')
df.show(5)
+-----------+---------+----+---+--------+------+------+--------+---+---+ | _c0| _c1| _c2|_c3| _c4| _c5| _c6| _c7|_c8|_c9| +-----------+---------+----+---+--------+------+------+--------+---+---+ |TXCUST00001|982120000|Male| N| PrePaid|Active|Active|InActive| 20| N| |TXCUST00002|982120001|Male| N|PostPaid|Active|Active|InActive| 25| N| |TXCUST00003|982120002|Male| N| PrePaid|Active|Active|InActive| 20| Y| |TXCUST00004|982120003|Male| Y| PrePaid|Active|Active|InActive| 25| N| |TXCUST00005|982120004|Male| N| PrePaid|Active|Active|InActive| 15| N| +-----------+---------+----+---+--------+------+------+--------+---+---+ only showing top 5 rows
In [22]:
df = df.withColumnRenamed("_c0", "CustomerID") \
.withColumnRenamed("_c1", "Mobile Number") \
.withColumnRenamed("_c2", "Gender") \
.withColumnRenamed("_c3", "SeniorCitizen") \
.withColumnRenamed("_c9", "CustomerChurn")
In [23]:
df.show(5)
+-----------+-------------+------+-------------+--------+------+------+--------+---+-------------+ | CustomerID|Mobile Number|Gender|SeniorCitizen| _c4| _c5| _c6| _c7|_c8|CustomerChurn| +-----------+-------------+------+-------------+--------+------+------+--------+---+-------------+ |TXCUST00001| 982120000| Male| N| PrePaid|Active|Active|InActive| 20| N| |TXCUST00002| 982120001| Male| N|PostPaid|Active|Active|InActive| 25| N| |TXCUST00003| 982120002| Male| N| PrePaid|Active|Active|InActive| 20| Y| |TXCUST00004| 982120003| Male| Y| PrePaid|Active|Active|InActive| 25| N| |TXCUST00005| 982120004| Male| N| PrePaid|Active|Active|InActive| 15| N| +-----------+-------------+------+-------------+--------+------+------+--------+---+-------------+ only showing top 5 rows
In [29]:
df.filter("SeniorCitizen == 'Y' and CustomerChurn == 'Y'").show(5)
+-----------+-------------+------+-------------+-------+------+------+--------+---+-------------+ | CustomerID|Mobile Number|Gender|SeniorCitizen| _c4| _c5| _c6| _c7|_c8|CustomerChurn| +-----------+-------------+------+-------------+-------+------+------+--------+---+-------------+ |TXCUST00009| 982120008| Male| Y|PrePaid|Active|Active|InActive| 15| Y| |TXCUST00186| 982120185| Male| Y|PrePaid|Active|Active| Active| 80| Y| |TXCUST00187| 982120186| Male| Y|PrePaid|Active|Active| Active| 90| Y| |TXCUST00188| 982120187| Male| Y|PrePaid|Active|Active| Active|100| Y| |TXCUST00189| 982120188| Male| Y|PrePaid|Active|Active| Active| 65| Y| +-----------+-------------+------+-------------+-------+------+------+--------+---+-------------+ only showing top 5 rows
In [33]:
# According to Spark documentation, Spark SQL DataFrames, "where() is an alias for filter()"
df.where("SeniorCitizen == 'Y' and CustomerChurn == 'Y'").head(5)
Out[33]:
[Row(CustomerID='TXCUST00009', Mobile Number='982120008', Gender='Male', SeniorCitizen='Y', _c4='PrePaid', _c5='Active', _c6='Active', _c7='InActive', _c8='15', CustomerChurn='Y'), Row(CustomerID='TXCUST00186', Mobile Number='982120185', Gender='Male', SeniorCitizen='Y', _c4='PrePaid', _c5='Active', _c6='Active', _c7='Active', _c8='80', CustomerChurn='Y'), Row(CustomerID='TXCUST00187', Mobile Number='982120186', Gender='Male', SeniorCitizen='Y', _c4='PrePaid', _c5='Active', _c6='Active', _c7='Active', _c8='90', CustomerChurn='Y'), Row(CustomerID='TXCUST00188', Mobile Number='982120187', Gender='Male', SeniorCitizen='Y', _c4='PrePaid', _c5='Active', _c6='Active', _c7='Active', _c8='100', CustomerChurn='Y'), Row(CustomerID='TXCUST00189', Mobile Number='982120188', Gender='Male', SeniorCitizen='Y', _c4='PrePaid', _c5='Active', _c6='Active', _c7='Active', _c8='65', CustomerChurn='Y')]
3. Using Pandas on PySpark¶
In [37]:
from pyspark import pandas as ppd
df_pandas = ppd.read_csv('../in/TelecomData.csv',
names = ['CustomerID', 'MobileNumber', 'Gender', 'SeniorCitizen', 'Mode', 'Calls', 'SMS', 'InternetServiceStatus', 'MonthlyCharges', 'CustomerChurn'])
/home/ashish/anaconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: If `index_col` is not specified for `read_csv`, the default index is attached which can cause additional overhead. warnings.warn(message, PandasAPIOnSparkAdviceWarning)
In [38]:
df_pandas.head()
Out[38]:
CustomerID | MobileNumber | Gender | SeniorCitizen | Mode | Calls | SMS | InternetServiceStatus | MonthlyCharges | CustomerChurn | |
---|---|---|---|---|---|---|---|---|---|---|
0 | TXCUST00001 | 982120000 | Male | N | PrePaid | Active | Active | InActive | 20 | N |
1 | TXCUST00002 | 982120001 | Male | N | PostPaid | Active | Active | InActive | 25 | N |
2 | TXCUST00003 | 982120002 | Male | N | PrePaid | Active | Active | InActive | 20 | Y |
3 | TXCUST00004 | 982120003 | Male | Y | PrePaid | Active | Active | InActive | 25 | N |
4 | TXCUST00005 | 982120004 | Male | N | PrePaid | Active | Active | InActive | 15 | N |
In [43]:
res = df_pandas[(df_pandas['SeniorCitizen'] == 'Y') & (df_pandas['CustomerChurn'] == 'Y')]
In [46]:
res[['CustomerID', 'SeniorCitizen', 'CustomerChurn']].head()
Out[46]:
CustomerID | SeniorCitizen | CustomerChurn | |
---|---|---|---|
8 | TXCUST00009 | Y | Y |
185 | TXCUST00186 | Y | Y |
186 | TXCUST00187 | Y | Y |
187 | TXCUST00188 | Y | Y |
188 | TXCUST00189 | Y | Y |
In [ ]: