What Are Transformations?
In Spark, the core data structures are immutable meaning they cannot be changed once created. This might seem like a strange concept at first, if you cannot change it, how are you supposed to use it? In order to “change” a DataFrame you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want. These instructions are called transformations. Transformations are the core of how you will be expressing your business logic using Spark. There are two types of transformations, those that specify narrow dependencies and those that specify wide dependencies.What Are Narrow Dependencies?
Transformations consisting of narrow dependencies [we’ll call them narrow transformations] are those where each input partition will contribute to only one output partition.What Are Wide Dependencies?
A wide dependency [or wide transformation] style transformation will have input partitions contributing to many output partitions. You will often hear this referred to as a shuffle where Spark will exchange partitions across the cluster. With narrow transformations, Spark will automatically perform an operation called pipelining on narrow dependencies, this means that if we specify multiple filters on DataFrames they’ll all be performed in-memory. The same cannot be said for shuffles. When we perform a shuffle, Spark will write the results to disk. You’ll see lots of talks about shuffle optimization across the web because it’s an important topic but for now all you need to understand are that there are two kinds of transformations. databricks.compyspark.RDD.flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. spark.apache.org/docs On side note: flatMap() is a narrow transformation, i.e.: Data computation happens within the same partition of the parent RDD Data does not get shuffled across the RDD partitions Examples are map(), flatMap(), filter() Other type of transformations is: wide transformation Records that need to be computed are scattered across the partitions of the parent RDD Data gets shuffled while processing Examples are groupByKey(), reduceByKey()
In [2]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
23/02/01 17:40:36 WARN Utils: Your hostname, ashish-Lenovo-ideapad-130-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.187 instead (on interface wlp2s0) 23/02/01 17:40:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/01 17:40:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [20]:
rdd = sc.parallelize([2, 3, 4])
sorted(rdd.flatMap(lambda x: range(1, x)).collect())
Out[20]:
[1, 1, 1, 2, 2, 3]
In [19]:
sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
Out[19]:
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
In [3]:
input = sc.textFile("../in/lorem_ipsum.txt")
In [4]:
input.collect()
Out[4]:
['Lorem ipsum dolor sit amet', 'consectetur adipiscing elit', 'sed do eiusmod tempor incididunt ut labore et dolore magna aliqua', 'Lorem ipsum dolor sit amet', 'consectetur adipiscing elit']
In [5]:
# Transformation statement to split up and flatten into words.
words = input.flatMap(lambda line: line.split(" "))
In [7]:
words.collect()
Out[7]:
['Lorem', 'ipsum', 'dolor', 'sit', 'amet', 'consectetur', 'adipiscing', 'elit', 'sed', 'do', 'eiusmod', 'tempor', 'incididunt', 'ut', 'labore', 'et', 'dolore', 'magna', 'aliqua', 'Lorem', 'ipsum', 'dolor', 'sit', 'amet', 'consectetur', 'adipiscing', 'elit']
In [8]:
# Transformation statement to create a paired structure <Key,value>.
counts = words.map(lambda word: (word, 1))
In [10]:
counts.collect()
Out[10]:
[('Lorem', 1), ('ipsum', 1), ('dolor', 1), ('sit', 1), ('amet', 1), ('consectetur', 1), ('adipiscing', 1), ('elit', 1), ('sed', 1), ('do', 1), ('eiusmod', 1), ('tempor', 1), ('incididunt', 1), ('ut', 1), ('labore', 1), ('et', 1), ('dolore', 1), ('magna', 1), ('aliqua', 1), ('Lorem', 1), ('ipsum', 1), ('dolor', 1), ('sit', 1), ('amet', 1), ('consectetur', 1), ('adipiscing', 1), ('elit', 1)]
In [13]:
#Transformation statement to aggregate the values of same key
wordcount = counts.reduceByKey(lambda a, b: a+b)
#Action statement to print the output on the console
wordcount.collect()
Out[13]:
[('Lorem', 2), ('ipsum', 2), ('amet', 2), ('consectetur', 2), ('sed', 1), ('do', 1), ('labore', 1), ('magna', 1), ('dolor', 2), ('sit', 2), ('adipiscing', 2), ('elit', 2), ('eiusmod', 1), ('tempor', 1), ('incididunt', 1), ('ut', 1), ('et', 1), ('dolore', 1), ('aliqua', 1)]
In [ ]: