pyspark.Accumulator
A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.Accumulators Accumulators provide a way of aggregating values in worker nodes and sending back the final value to the driver program Used to count events that occur during job execution and debugging purpose Used to implement counters similar to MapReduce counters. Limitations: If the executor failed for any reason, you would see an inconsistent value for count or sum as it executes from the beginning again. Requirement: Count the empty lines in a data set distributed across multiple worker nodes.
In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
23/02/06 18:22:20 WARN Utils: Your hostname, ashish-Lenovo-ideapad-130-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.108 instead (on interface wlp2s0) 23/02/06 18:22:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/06 18:22:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [5]:
#create the RDD
TelecomRDD = sc.textFile("../in/TelecomDataAccumulators.csv");
#Create and initialize the accumulator to count blank lines in the file.
blankLinesCounter = sc.accumulator(0)
#verify each line and increment the counter
TelecomRDD.foreach(lambda line: blankLinesCounter.add(1) if len(line) == 0 else None)
print("Empty Lines: " + str(blankLinesCounter.value))
[Stage 0:> (0 + 2) / 2]
Empty Lines: 8
In [14]:
TelecomRDD.count()
Out[14]:
508
Attempt to read the file with blank lines as SQL DataFrame and PySpark.Pandas DataFrame to log the errors (if any)¶
In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
In [8]:
df = spark.read.option("header", False).csv('../in/TelecomDataAccumulators.csv')
In [12]:
df.head()
Out[12]:
Row(_c0='TXCUST00001', _c1='982120000', _c2='Male', _c3='N', _c4='PrePaid', _c5='Active', _c6='Active', _c7='InActive', _c8='20', _c9='N')
In [13]:
df.count()
Out[13]:
500
Read operations to get SQL DataFrame or PySpark.Pandas DataFrames by default remove blank lines¶
In [18]:
from pyspark import pandas as ppd
df_pandas = ppd.read_csv('../in/TelecomDataAccumulators.csv',
names = ['CustomerID', 'MobileNumber', 'Gender', 'SeniorCitizen', 'Mode', 'Calls', 'SMS', 'InternetServiceStatus', 'MonthlyCharges', 'CustomerChurn'])
In [19]:
df_pandas.shape
Out[19]:
(500, 10)
In [ ]: