Monday, February 6, 2023

Demonstrating count(), take() and collect() for printing contents of a PySpark RDD

Contents of the file "links.csv":

https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html
https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html
https://survival8.blogspot.com/2020/03/coronavirus-disease-covid-19-advice-for.html

PySpark Python Code:

from time import time

from bs4 import BeautifulSoup
from urllib.request import urlopen

from pyspark import SparkContext

sc = SparkContext()

start_time = time()

url_list_path = '/home/ashish/Desktop/links.csv'

urls_lines = sc.textFile(url_list_path)

def processRecord(url):
    if len(url) > 0:
        page = urlopen(url)
        soup = BeautifulSoup(page, features="lxml")
        rtnVal = soup.prettify()
    else:
        url = "NA"
        rtnVal = "NA"
    return [url, rtnVal]

temp = urls_lines.map(processRecord)


print("Number of rows: {}".format(temp.count()))

print("Print using .take(2)")
for i in temp.take(2):
    print(i[0])

print("Print using .take(3)")
for i in temp.take(3):
    print(i[0])

print("Print using .collect()")
temp_rdd = temp.collect()
for elem in temp_rdd:
    print(elem[0])

print("Time taken: " + str(time() - start_time)) 

Logs:

(base) ashish@ashish-vBox:~/Desktop/workspace/VisualStudioCode$ /usr/local/spark/bin/spark-submit --master local /home/ashish/Desktop/spark_script_2.py 100
2020-03-18 22:35:10,382 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-03-18 22:35:11,265 INFO spark.SparkContext: Running Spark version 2.4.4
2020-03-18 22:35:11,304 INFO spark.SparkContext: Submitted application: spark_script_2.py
...
2020-03-18 22:35:31,008 INFO scheduler.DAGScheduler: ResultStage 0 (count at /home/ashish/Desktop/spark_script_2.py:29) finished in 16.794 s
2020-03-18 22:35:31,021 INFO scheduler.DAGScheduler: Job 0 finished: count at /home/ashish/Desktop/spark_script_2.py:29, took 16.903808 s
Number of rows: 3
Print using .take(2)
...
2020-03-18 22:35:44,025 INFO scheduler.DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:153, took 12.928439 s
https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html
https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html
Print using .take(3)
2020-03-18 22:35:44,055 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:153
... 
https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html
https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html
https://survival8.blogspot.com/2020/03/coronavirus-disease-covid-19-advice-for.html
Print using .collect() 
2020-03-18 22:35:57,159 INFO spark.SparkContext: Starting job: collect at /home/ashish/Desktop/spark_script_2.py:40
... 
https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html
https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html
https://survival8.blogspot.com/2020/03/coronavirus-disease-covid-19-advice-for.html 
Time taken: 54.933162450790405 
2020-03-18 22:36:08,120 INFO spark.SparkContext: Invoking stop() from shutdown hook
...
(base) ashish@ashish-vBox:~/Desktop/workspace/VisualStudioCode$ 

Google Drive Link to code:
https://drive.google.com/open?id=1MXgDzDCVjpqgyLLiVIzcwUHdawhwd_ma

# Useful links
# https://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd
Tags: Spark,Technology,

No comments:

Post a Comment