Web Scraping using PySpark (standalone mode) and BeautifulSoup


This is demonstration of web scraping on PySpark standalone cluster (i.e., having only one node).

We have our PySpark installation at path:
/usr/local/spark

We have our input file "links.csv" at path:
/home/ashish/Desktop/links.csv

The contents of "links.csv" file are:
https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html
https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html
https://survival8.blogspot.com/2020/03/coronavirus-disease-covid-19-advice-for.html

We have our Python code in file "spark_script_1.py"
/home/ashish/Desktop/spark_script_1.py

Code:
from time import time

from bs4 import BeautifulSoup
from urllib.request import urlopen

from pyspark import SparkContext

sc = SparkContext()

start_time = time()

url_list_path = '/home/ashish/Desktop/links.csv'

urls_lines = sc.textFile(url_list_path)

def processRecord(url):
    if len(url) > 0:
        page = urlopen(url)
        soup = BeautifulSoup(page, features="lxml")
        rtnVal = soup.prettify()
    else:
        url = "NA"
        rtnVal = "NA"
    return [url, rtnVal]

temp = urls_lines.map(processRecord)

temp_rdd = temp.collect()

for elem in temp_rdd:
    print(elem)

print("Time taken: " + str(time() - start_time))

The way to execute this program:
$ /usr/local/spark/bin/spark-submit --master local /home/ashish/Desktop/spark_script_1.py 100

Logs:
(base) ashish@ashish-vBox:~/Desktop$ /usr/local/spark/bin/spark-submit --master local /home/ashish/Desktop/spark_script_1.py 100

2020-03-18 09:46:08,055 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-03-18 09:46:08,990 INFO spark.SparkContext: Running Spark version 2.4.4
2020-03-18 09:46:09,021 INFO spark.SparkContext: Submitted application: spark_script_1.py
2020-03-18 09:46:09,099 INFO spark.SecurityManager: Changing view acls to: ashish
2020-03-18 09:46:09,099 INFO spark.SecurityManager: Changing modify acls to: ashish
2020-03-18 09:46:09,099 INFO spark.SecurityManager: Changing view acls groups to: 
2020-03-18 09:46:09,099 INFO spark.SecurityManager: Changing modify acls groups to: 
2020-03-18 09:46:09,099 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ashish); groups with view permissions: Set(); users  with modify permissions: Set(ashish); groups with modify permissions: Set()
2020-03-18 09:46:09,485 INFO util.Utils: Successfully started service 'sparkDriver' on port 35827.
2020-03-18 09:46:09,524 INFO spark.SparkEnv: Registering MapOutputTracker
2020-03-18 09:46:09,558 INFO spark.SparkEnv: Registering BlockManagerMaster
2020-03-18 09:46:09,561 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2020-03-18 09:46:09,562 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
2020-03-18 09:46:09,576 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-379a4900-dcb9-407f-8e58-7745b9fe938b
2020-03-18 09:46:09,595 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
2020-03-18 09:46:09,622 INFO spark.SparkEnv: Registering OutputCommitCoordinator
2020-03-18 09:46:09,747 INFO util.log: Logging initialized @2984ms
2020-03-18 09:46:09,879 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
2020-03-18 09:46:09,902 INFO server.Server: Started @3140ms
2020-03-18 09:46:09,940 INFO server.AbstractConnector: Started ServerConnector@2eb8147f{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-03-18 09:46:09,942 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
2020-03-18 09:46:10,012 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6e3405d6{/jobs,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,014 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3c871004{/jobs/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,016 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3756e90f{/jobs/job,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,019 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@df3aa3a{/jobs/job/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,021 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9709cb0{/stages,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,026 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@96e0200{/stages/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,027 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c2ea226{/stages/stage,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,030 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@80383fe{/stages/stage/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,032 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11252f4a{/stages/pool,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,033 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@e99e1e2{/stages/pool/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,035 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@58c5eb7c{/storage,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,037 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4ccd9fe9{/storage/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,040 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74aaa645{/storage/rdd,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,041 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@35fddfc3{/storage/rdd/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,042 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@42944c0b{/environment,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,044 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6206b167{/environment/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,047 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@433fabec{/executors,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,048 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@18e7778f{/executors/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,049 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7873948c{/executors/threadDump,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,050 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fa99f6c{/executors/threadDump/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,061 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11f731be{/static,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,062 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@10c135f0{/,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,064 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@28b6ed28{/api,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,065 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5e751d68{/jobs/job/kill,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,067 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5721e050{/stages/stage/kill,null,AVAILABLE,@Spark}
2020-03-18 09:46:10,071 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ashish-vBox:4040
2020-03-18 09:46:10,218 INFO executor.Executor: Starting executor ID driver on host localhost
2020-03-18 09:46:10,313 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33691.
2020-03-18 09:46:10,314 INFO netty.NettyBlockTransferService: Server created on ashish-vBox:33691
2020-03-18 09:46:10,316 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2020-03-18 09:46:10,352 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ashish-vBox, 33691, None)
2020-03-18 09:46:10,358 INFO storage.BlockManagerMasterEndpoint: Registering block manager ashish-vBox:33691 with 366.3 MB RAM, BlockManagerId(driver, ashish-vBox, 33691, None)
2020-03-18 09:46:10,361 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ashish-vBox, 33691, None)
2020-03-18 09:46:10,363 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, ashish-vBox, 33691, None)
2020-03-18 09:46:10,595 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1f6fce92{/metrics/json,null,AVAILABLE,@Spark}
2020-03-18 09:46:11,494 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 236.7 KB, free 366.1 MB)
2020-03-18 09:46:11,573 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 366.0 MB)
2020-03-18 09:46:11,577 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ashish-vBox:33691 (size: 22.9 KB, free: 366.3 MB)
2020-03-18 09:46:11,585 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
2020-03-18 09:46:11,746 INFO mapred.FileInputFormat: Total input paths to process : 1
2020-03-18 09:46:11,850 INFO spark.SparkContext: Starting job: collect at /home/ashish/Desktop/spark_script_1.py:30
2020-03-18 09:46:11,879 INFO scheduler.DAGScheduler: Got job 0 (collect at /home/ashish/Desktop/spark_script_1.py:30) with 1 output partitions
2020-03-18 09:46:11,880 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (collect at /home/ashish/Desktop/spark_script_1.py:30)
2020-03-18 09:46:11,882 INFO scheduler.DAGScheduler: Parents of final stage: List()
2020-03-18 09:46:11,885 INFO scheduler.DAGScheduler: Missing parents: List()
2020-03-18 09:46:11,895 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at collect at /home/ashish/Desktop/spark_script_1.py:30), which has no missing parents
2020-03-18 09:46:11,932 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.3 KB, free 366.0 MB)
2020-03-18 09:46:11,938 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 366.0 MB)
2020-03-18 09:46:11,941 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ashish-vBox:33691 (size: 4.0 KB, free: 366.3 MB)
2020-03-18 09:46:11,955 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
2020-03-18 09:46:11,993 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at collect at /home/ashish/Desktop/spark_script_1.py:30) (first 15 tasks are for partitions Vector(0))
2020-03-18 09:46:11,995 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
2020-03-18 09:46:12,099 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7895 bytes)
2020-03-18 09:46:12,109 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
2020-03-18 09:46:12,216 INFO rdd.HadoopRDD: Input split: file:/home/ashish/Desktop/links.csv:0+245
2020-03-18 09:46:17,891 INFO python.PythonRunner: Times: total = 5639, boot = 422, init = 156, finish = 5061
2020-03-18 09:46:17,934 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 300249 bytes result sent to driver
2020-03-18 09:46:17,971 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5892 ms on localhost (executor driver) (1/1)
2020-03-18 09:46:17,980 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
2020-03-18 09:46:17,988 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 45805
2020-03-18 09:46:18,007 INFO scheduler.DAGScheduler: ResultStage 0 (collect at /home/ashish/Desktop/spark_script_1.py:30) finished in 6.081 s
2020-03-18 09:46:18,027 INFO scheduler.DAGScheduler: Job 0 finished: collect at /home/ashish/Desktop/spark_script_1.py:30, took 6.175831 s
['https://survival8.blogspot.com/2020/03/the-train-and-wheelbarrow-lesson-from.html', '<!DOCTYPE html>\n<html>...</html>']
['https://survival8.blogspot.com/2020/03/yes-bank-to-be-dropped-from-nifty50.html', '<!DOCTYPE html>\n<html>...</html>']
['https://survival8.blogspot.com/2020/03/coronavirus-disease-covid-19-advice-for.html', '<!DOCTYPE html>\n<html>...</html>']
Time taken: 7.360840082168579
2020-03-18 09:46:18,225 INFO spark.SparkContext: Invoking stop() from shutdown hook
2020-03-18 09:46:18,238 INFO server.AbstractConnector: Stopped Spark@2eb8147f{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2020-03-18 09:46:18,241 INFO ui.SparkUI: Stopped Spark web UI at http://ashish-vBox:4040
2020-03-18 09:46:18,268 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2020-03-18 09:46:18,303 INFO memory.MemoryStore: MemoryStore cleared
2020-03-18 09:46:18,305 INFO storage.BlockManager: BlockManager stopped
2020-03-18 09:46:18,338 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2020-03-18 09:46:18,346 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2020-03-18 09:46:18,352 INFO spark.SparkContext: Successfully stopped SparkContext
2020-03-18 09:46:18,353 INFO util.ShutdownHookManager: Shutdown hook called
2020-03-18 09:46:18,355 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-ff834ab8-0025-4b95-9b85-cf379a57b580
2020-03-18 09:46:18,358 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-2d5cfecd-0e9e-4554-b7d9-6662f857273d/pyspark-53796bf4-15d2-4348-a213-78eaae4f7484
2020-03-18 09:46:18,361 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-2d5cfecd-0e9e-4554-b7d9-6662f857273d 
(base) ashish@ashish-vBox:~/Desktop$ 

1 comment: