Saturday, October 8, 2022

Four Ways to Read a CSV in PySpark (v3.3.0)

Download Code

import seaborn as sns
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext
from pyspark.sql import SQLContext # Main entry point for SQL based DataFrame (other is Pandas based DataFrame) and SQL functionality.

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

import pyspark
print(pyspark.__version__)
    

3.3.0

Our input data looks like this:

with open('./input/student.csv', mode = 'r', encoding = 'utf8') as f: data = f.readlines() import pandas as pd df_student = pd.read_csv('./input/student.csv') data ['sno,FirstName,LASTNAME\n', 'one,Ram,\n', 'two,,Sharma\n', 'three,Shyam,NA\n', 'four,Kabir,\n', 'five,NA,Singh\n'] df_student.head()

When you load a Pandas DataFrame by reading from a CSV, blank values and 'NA' values are converted to 'NaN' values by default as shown above.

Way 1

Also, PySpark's sqlCtx.createDataFrame() results in error on Pandas dataframe with null values.

df_student = pd.read_csv('./input/student.csv') sdf = sqlCtx.createDataFrame(df_student) TypeError: field FirstName: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'> def clean_data(df): df.fillna('Not Applicable', inplace = True) # Handles blank and 'NA' values both. df = df.apply(lambda x: x.str.strip()) df.columns = df.columns.str.lower() return df df_student = clean_data(df_student) df_student.fillna('Not Applicable', inplace = True) # Handles blank and 'NA' values both. sdf = sqlCtx.createDataFrame(df_student) type(sdf) pyspark.sql.dataframe.DataFrame sdf.show()

Way 2

New feature in 3.2.1 [ Ref ] df = pyspark.pandas.read_csv('./input/student.csv') # Error if 'pandas' package is not there in your version of 'pyspark'. # AttributeError: module 'pyspark' has no attribute 'pandas' from pyspark import pandas as ppd df_student_pyspark = ppd.read_csv('./input/student.csv') type(df_student_pyspark) pyspark.pandas.frame.DataFrame df_student_pyspark

Way 3

[ Ref ] from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() # A CSV dataset is pointed to by path. # The path can be either a single CSV file or a directory of CSV files # path = "examples/src/main/resources/people.csv" df = spark.read.option("header", True).csv('./input/student.csv') df.show()
type(df) pyspark.sql.dataframe.DataFrame

Way 4: Using the plain old RDD

Shane works in data analytics project and needs to process Users event data (UserLogs.csv file). Which of the below code snippet can be used to split the fields with a comma as a delimiter and fetch only the first two fields from it? logsRDD = sc.textFile("/HDFSPATH/UserLogs.csv"); FieldsRDD = logsRDD.map(lambda r : r.split(",")).map(lambda r: (r[0],r[1]))
Tags: Technology,Spark

No comments:

Post a Comment