Saturday, March 9, 2024

5 Questions on PySpark Technology

Q1 of 5 

Which of the below Spark Core API is used to load the retail.csv file and create RDD? 

retailRDD = sc.readFile("/HDFSPATH/retail.csv") 

retailRDD = sc.parallelize("/HDFSPATH/retail.csv") 

retailRDD = sc.textFile("/HDFSPATH/retail.csv") *** 

retailRDD = sc.createFile("/HDFSPATH/retail.csv") 

Q2 of 5 

Shane works in data analytics project and needs to process Users event data (UserLogs.csv file). Which of the below code snippet can be used to split the fields with a comma as a delimiter and fetch only the first two fields from it? 

logsRDD = sc.textFile("/HDFSPATH/UserLogs.csv"); 
FieldsRDD = logsRDD.map(lambda r : r.split(",")).map(lambda r: (r[0],r[1])) *** 

logsRDD = sc.parallelize("/HDFSPATH/UserLogs.csv"); 
FieldsRDD = logsRDD.map(lambda r : r.split(",")).map(lambda r: (r[0],r[1])) 

logsRDD = sc.parallelize("/HDFSPATH/UserLogs.csv"); 
FieldsRDD = logsRDD.filter(lambda r : r.split(",")).map(lambda r: (r[0],r[1])) 

logsRDD = sc.textFile("/HDFSPATH/UserLogs.csv"); 
FieldsRDD = logsRDD.filter(lambda r : r.split(",")).map(lambda r: (r[0],r[1])) 

Q3 of 5

Consider a retail scenario where a paired RDD exists with data (ProductName, Price). Price value must be reduced by 500 as a customer discount. Which paired RDD function in spark can be used for this requirement? 

mapValues() 

keys() 

values() 

map() 

--- mapValues applies the function logic to the value part of the paired RDD without changing the key 

Q4 of 5 
Consider a banking scenario where credit card transaction logs need to be processed. The log contains CustomerID, CustomerName, CreditCard Number, and TransactionAmount fields. Which code snippet below creates a paired RDD ? 

logsRDD = sc.textFile("/HDFSPath/Logs.txt"); 

logsRDD = sc.textFile("/HDFSPath/Logs.txt"); 
LogsPairedRDD = logsRDD.map(lambda r : r.split(",")).map(lambda r: (r[0],int(r[3]))) *** 

logsRDD = sc.textFile("/HDFSPath/Logs.txt"); 
LogsPairedRDD = logsRDD.map(lambda r : r.split(",")).map(lambda r: (r[0],int(r[2]))) 

logsRDD = sc.textFile("/HDFSPath/Logs.txt").map(lambda r: (r[0],int(r[3]))) 

Q5 of 5 

Consider a Spark scenario where an array must be used as a Broadcast variable. Which of the below code snippet is used to access the broadcast variable value? 

bv = sc.broadcast(Array(100,200,300)) 
bv.getValue --- 

bv = sc.broadcast(Array(100,200,300)) 
bv.value 

bv = sc.broadcast(Array(100,200,300)) 
bv.find 

bv = sc.broadcast(Array(100,200,300)) 
bv.fetchValue 

Spark Core Challenges

Business Scenario Arisconn Cars provides rental car service across the globe. To improve their customer service, the client wants to analyze periodically each car’s sensor data to repair faults and problems in the car. Sensor data from cars are streamed through events hub (data ingestion tool) into Hadoop's HDFS (distributed file system) and analyzed using Spark Core programming to find out cars generating maximum errors. This analysis would help Arisconn to send the service team to repair the cars even before they fail. Below is the Schema of the big dataset of Arisconn Cars which holds 10 million records approximately. [sensorID, carID, latitude, longitude, engine_speed, accelerator_pedal_position, vehicle_speed, torque_at_transmission, fuel_level, typeOfMessage, timestamp] typeOfMessage: INFO, WARN, ERR, DEBUG Arisconn has the below set of requirements to be performed against the dataset: Filter fields - Sensor id, Car id, Latitude, Longitude, Vehicle Speed, TypeOfMessage Filter valid records i.e., discard records containing '?' Filter records holding only error messages (ignore warnings and info messages) Apply aggregation to count number of error messages produced by cars Below is the Python code to implement the first three requirements. #Loading a text file in to an RDD Car_Info = sc.textFile("/HDFSPath/ArisconnDataset.txt"); #Referring the header of the file header=Car_Info.first() #Removing header and splitting records with ',' as delimiter and fetching relevant fields Car_temp = Car_Info.filter(lambda record:record!=header).map(lambda r:r.split(",")).map(lambda c:(c[0],c[1],float([2]),float(c[4]),int(c[6]),c[9])); #Filtering only valid records(records not starting with '?'), and f[1] refers to first field (sensorid) Car_Eng_Specs = Car_temp.filter(lambda f:str(f[1]).startswith("?")) #Filtering records holding only error messages and f[6] refers to 6th field (Typeofmessage) Car_Error_logs = Car_Eng_Specs.filter(lambda f:str(f[6]).startswith("ERR")) In the above code, Arisconn's dataset is loaded into RDD (Car_Info) The header of the dataset is removed and only fields (sensorid, carid, latitude, longitude, vehiclespeed, TypeOfMessage) are filtered. Refer to RDD Car_temp Records starting with '?' are removed. Refer to RDD Car_Eng_Specs. Records containing TypeOfMessage = "ERR" get filtered There are few challenges in the above code and even the fourth requirement is too complex to implement in Spark Core. We shall discuss this next.

No comments:

Post a Comment