Join Types
Inner Join The inner join is the default join in Spark SQL. It selects rows that have matching values in both relations. Syntax: relation [ INNER ] JOIN relation [ join_criteria ] Left Join A left join returns all values from the left relation and the matched values from the right relation, or appends NULL if there is no match. It is also referred to as a left outer join. Syntax: relation LEFT [ OUTER ] JOIN relation [ join_criteria ] Right Join A right join returns all values from the right relation and the matched values from the left relation, or appends NULL if there is no match. It is also referred to as a right outer join. Syntax: relation RIGHT [ OUTER ] JOIN relation [ join_criteria ] Full Join A full join returns all values from both relations, appending NULL values on the side that does not have a match. It is also referred to as a full outer join. Syntax: relation FULL [ OUTER ] JOIN relation [ join_criteria ] Cross Join A cross join returns the Cartesian product of two relations. Syntax: relation CROSS JOIN relation [ join_criteria ] Semi Join A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join. Syntax: relation [ LEFT ] SEMI JOIN relation [ join_criteria ] Anti Join An anti join returns values from the left relation that has no match with the right. It is also referred to as a left anti join. Syntax: relation [ LEFT ] ANTI JOIN relation [ join_criteria ] Ref: spark.apache.orgVisualizing Using Venn Diagrams
1: Using RDD¶
Spark provides a join() function that can join two paired RDDs based on the same key. join(): Performs an inner join between two RDDs: firstRDD.join(laterRDD) rightOuterJoin(): Performs join operation between two RDDs with key present in first RDD: firstRDD.rightOuterJoin(laterRDD) leftOuterJoin(): Performs join operation between two RDDs with key present in the later RDD: firstRDD.leftOuterJoin(laterRDD) Requirement: Let us consider two different datasets of ArisCCNetwork RouterLocation.tsv and RouterPurchase.tsv. Schema: RouterLocation.tsv: RouterID, Name, Location RouterPurchase.tsv: RouterID, Date, PrimaryMemory, SecondaryMemory, Cost Join these two datasets to fetch Routers Location, Cost, and Memory details into a single RDD. Implementation steps to join Step 1: Create namedtuple classes representing datasets Create two namedtuple representing the schema of each dataset. Note: namedtuple is just like a dictionary. It improves the code readability by providing a way to access the values using descriptive field names. Step 2: Generate <K,V> pairs using namedtuple In this step, datasets are loaded as RDDs Paired RDDs <K, V> are created where K = common column in both RDDs, V = value part which contains a complete record Step 3: Apply the join() function Spark join is applied against the grouped fields of locRDD and purRDD from the previous step.
In [19]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
In [20]:
from collections import namedtuple
RouterLocation = namedtuple('RouterLocation',['rid','name','location'])
RouterPurchase = namedtuple('RouterPurchase',['rid','date','pmemory','smemory','cost'])
In [21]:
#Load RouterLocation dataset and generate Rid(common field),RouterLocation object
locRDD = sc.textFile("../in/RouterLocation.tsv")\
.map(lambda line:line.split("\t"))\
.map(lambda r : (r[0], RouterLocation(int(r[0]), r[1], r[2])))
In [22]:
#Load RouterPurchase dataset and generate Rid(common field),RouterLocation object
purRDD = sc.textFile("../in/RouterPurchase.tsv")\
.map(lambda line:line.split("\t"))\
.map(lambda r : (r[0], RouterPurchase(int(r[0]), r[1], int(r[2]), int(r[3]), r[4])))
In [23]:
locRDD.collect()
Out[23]:
[('1', RouterLocation(rid=1, name='RTR1', location='Chennai')), ('2', RouterLocation(rid=2, name='RTR2', location='Bangalore')), ('3', RouterLocation(rid=3, name='RTR3', location='Pune')), ('4', RouterLocation(rid=4, name='RTR4', location='Delhi')), ('5', RouterLocation(rid=5, name='RTR5', location='Mumbai'))]
In [24]:
result = locRDD.join(purRDD)
In [25]:
result.collect()
Out[25]:
[('4', (RouterLocation(rid=4, name='RTR4', location='Delhi'), RouterPurchase(rid=4, date='9/3/2014', pmemory=653235467, smemory=245913333, cost='1000USD'))), ('3', (RouterLocation(rid=3, name='RTR3', location='Pune'), RouterPurchase(rid=3, date='6/10/2013', pmemory=453232267, smemory=325913333, cost='1200USD'))), ('1', (RouterLocation(rid=1, name='RTR1', location='Chennai'), RouterPurchase(rid=1, date='9/3/2012', pmemory=453232267, smemory=175913333, cost='1000USD'))), ('2', (RouterLocation(rid=2, name='RTR2', location='Bangalore'), RouterPurchase(rid=2, date='9/7/2012', pmemory=453232345, smemory=255913333, cost='1500USD'))), ('5', (RouterLocation(rid=5, name='RTR5', location='Mumbai'), RouterPurchase(rid=5, date='7/7/2014', pmemory=373232267, smemory=465913333, cost='1300USD')))]
2: Using SQL DataFrames¶
In [26]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
In [50]:
df = spark.read.option("header", False).option("delimiter", "\t").csv('../in/RouterLocation.tsv')
# Ref: sql-data-sources-csv
In [51]:
df = df.withColumnRenamed("_c0", "rid")\
.withColumnRenamed("_c1", "name")\
.withColumnRenamed("_c2", "location")
In [52]:
df.show()
+---+----+---------+ |rid|name| location| +---+----+---------+ | 1|RTR1| Chennai| | 2|RTR2|Bangalore| | 3|RTR3| Pune| | 4|RTR4| Delhi| | 5|RTR5| Mumbai| +---+----+---------+
In [43]:
dfp = spark.read.option("header", False).option("delimiter", "\t").csv('../in/RouterPurchase.tsv')
In [44]:
dfp.show()
+---+---------+---------+---------+-------+ |_c0| _c1| _c2| _c3| _c4| +---+---------+---------+---------+-------+ | 1| 9/3/2012|453232267|175913333|1000USD| | 2| 9/7/2012|453232345|255913333|1500USD| | 3|6/10/2013|453232267|325913333|1200USD| | 4| 9/3/2014|653235467|245913333|1000USD| | 5| 7/7/2014|373232267|465913333|1300USD| +---+---------+---------+---------+-------+
In [46]:
dfp = dfp.withColumnRenamed("_c0", "rid")\
.withColumnRenamed("_c1", "date")\
.withColumnRenamed("_c2", "pmemory")\
.withColumnRenamed("_c3", "smemory")\
.withColumnRenamed("_c4", "cost")
In [47]:
dfp.show()
+---+---------+---------+---------+-------+ |rid| date| pmemory| smemory| cost| +---+---------+---------+---------+-------+ | 1| 9/3/2012|453232267|175913333|1000USD| | 2| 9/7/2012|453232345|255913333|1500USD| | 3|6/10/2013|453232267|325913333|1200USD| | 4| 9/3/2014|653235467|245913333|1000USD| | 5| 7/7/2014|373232267|465913333|1300USD| +---+---------+---------+---------+-------+
In [56]:
res = df.join(dfp, df.rid == dfp.rid, how = 'inner')
In [57]:
res.show()
+---+----+---------+---+---------+---------+---------+-------+ |rid|name| location|rid| date| pmemory| smemory| cost| +---+----+---------+---+---------+---------+---------+-------+ | 1|RTR1| Chennai| 1| 9/3/2012|453232267|175913333|1000USD| | 2|RTR2|Bangalore| 2| 9/7/2012|453232345|255913333|1500USD| | 3|RTR3| Pune| 3|6/10/2013|453232267|325913333|1200USD| | 4|RTR4| Delhi| 4| 9/3/2014|653235467|245913333|1000USD| | 5|RTR5| Mumbai| 5| 7/7/2014|373232267|465913333|1300USD| +---+----+---------+---+---------+---------+---------+-------+
how: str, optional default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.
3: Using PySpark.Pandas¶
In [60]:
from pyspark import pandas as ppd
dfl = ppd.read_csv('../in/RouterLocation.tsv', sep = '\t',
names = ['rid', 'name', 'location'])
/home/ashish/anaconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: If `index_col` is not specified for `read_csv`, the default index is attached which can cause additional overhead. warnings.warn(message, PandasAPIOnSparkAdviceWarning)
In [61]:
dfl
Out[61]:
rid | name | location | |
---|---|---|---|
0 | 1 | RTR1 | Chennai |
1 | 2 | RTR2 | Bangalore |
2 | 3 | RTR3 | Pune |
3 | 4 | RTR4 | Delhi |
4 | 5 | RTR5 | Mumbai |
In [62]:
dfp = ppd.read_csv('../in/RouterPurchase.tsv', sep = '\t',
names = ['rid', 'date', 'pmemory', 'smemory', 'cost'])
/home/ashish/anaconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: If `index_col` is not specified for `read_csv`, the default index is attached which can cause additional overhead. warnings.warn(message, PandasAPIOnSparkAdviceWarning)
In [63]:
dfp
Out[63]:
rid | date | pmemory | smemory | cost | |
---|---|---|---|---|---|
0 | 1 | 9/3/2012 | 453232267 | 175913333 | 1000USD |
1 | 2 | 9/7/2012 | 453232345 | 255913333 | 1500USD |
2 | 3 | 6/10/2013 | 453232267 | 325913333 | 1200USD |
3 | 4 | 9/3/2014 | 653235467 | 245913333 | 1000USD |
4 | 5 | 7/7/2014 | 373232267 | 465913333 | 1300USD |
In [64]:
res = ppd.merge(dfl, dfp, on="rid", how = 'inner')
In [65]:
res
Out[65]:
rid | name | location | date | pmemory | smemory | cost | |
---|---|---|---|---|---|---|---|
0 | 1 | RTR1 | Chennai | 9/3/2012 | 453232267 | 175913333 | 1000USD |
1 | 2 | RTR2 | Bangalore | 9/7/2012 | 453232345 | 255913333 | 1500USD |
2 | 3 | RTR3 | Pune | 6/10/2013 | 453232267 | 325913333 | 1200USD |
3 | 4 | RTR4 | Delhi | 9/3/2014 | 653235467 | 245913333 | 1000USD |
4 | 5 | RTR5 | Mumbai | 7/7/2014 | 373232267 | 465913333 | 1300USD |
In [ ]: