Broadcast variables Broadcast variable is used to send the large datasets to all the worker nodes that can be used as a lookup Cached on each machine rather than moving a copy of it with tasks Broadcast variables are read-only. Requirement: Telecom customer data is distributed across 4 slave nodes in HDFS. As part of this data processing requirement, the international roaming dataset needs to be used as a lookup. The initial solution implemented is by creating two RDDs and joining them. This led to performance issues due to a massive amount of data shuffling. Solution: Broadcasting the roaming dataset into all worker nodes (where the job is getting executed) and using it as a lookup.
In [2]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
23/02/08 09:13:46 WARN Utils: Your hostname, ashish-Lenovo-ideapad-130-15IKB resolves to a loopback address: 127.0.1.1; using 192.168.1.108 instead (on interface wlp2s0) 23/02/08 09:13:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/08 09:13:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/02/08 09:13:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 23/02/08 09:13:48 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
In [3]:
# Create an RDD with the Roaming dataset
IRDD = sc.textFile("../in/BroadCastData.csv")
In [4]:
IRDD.collect()
Out[4]:
['982120000,United States', '982120001,Australia', '982120002,Germany', '982120003,France', '982120004,India', '982120005,Canada', '982120006,Germany', '982120007,France', '982120008,India', '982120009,Canada', '982120010,Germany', '982120011,France', '982120012,India', '982120013,Canada', '982120014,United States', '982120015,Australia', '982120016,Germany', '982120017,France', '982120018,India', '982120019,Canada', '982120020,Germany', '982120021,France', '982120022,India', '982120023,Canada', '982120024,Germany', '982120025,France', '982120026,India', '982120027,Canada', '982120028,United States', '982120029,Australia', '982120030,Germany', '982120031,France', '982120032,India', '982120033,Canada', '982120034,Germany', '982120035,France', '982120036,India', '982120037,Canada', '982120038,Germany', '982120039,France', '982120040,India', '982120041,Canada']
In [5]:
#Create a Paired RDD
PairIRDD = IRDD.map(lambda k : (int(k.split(",")[0]), k.split(",")[1]))
#RDD can not be broadcasted. We need to convert it to a value.
#convert the paired RDD to a Map collection and Broadcast
broadcastStates = sc.broadcast(dict(PairIRDD.collect()))
In [6]:
#Create main RDD to be processed
TRDD = sc.textFile("../in/TelecomData.csv")
In [7]:
TRDD.take(5)
Out[7]:
['TXCUST00001,982120000,Male,N,PrePaid,Active,Active,InActive,20,N', 'TXCUST00002,982120001,Male,N,PostPaid,Active,Active,InActive,25,N', 'TXCUST00003,982120002,Male,N,PrePaid,Active,Active,InActive,20,Y', 'TXCUST00004,982120003,Male,Y,PrePaid,Active,Active,InActive,25,N', 'TXCUST00005,982120004,Male,N,PrePaid,Active,Active,InActive,15,N']
In [11]:
TRDD.count()
Out[11]:
500
In [8]:
#Access the broadcast value as a look-up data
IRoam = TRDD.map(lambda k : (k.split(",")[0], int(k.split(",")[1]),broadcastStates.value.get(int(k.split(",")[1]))))
In [9]:
IRoam.take(5)
Out[9]:
[('TXCUST00001', 982120000, 'United States'), ('TXCUST00002', 982120001, 'Australia'), ('TXCUST00003', 982120002, 'Germany'), ('TXCUST00004', 982120003, 'France'), ('TXCUST00005', 982120004, 'India')]
In [27]:
IRoam.take(50)
Out[27]:
[('TXCUST00001', 982120000, 'United States'), ('TXCUST00002', 982120001, 'Australia'), ('TXCUST00003', 982120002, 'Germany'), ('TXCUST00004', 982120003, 'France'), ('TXCUST00005', 982120004, 'India'), ('TXCUST00006', 982120005, 'Canada'), ('TXCUST00007', 982120006, 'Germany'), ('TXCUST00008', 982120007, 'France'), ('TXCUST00009', 982120008, 'India'), ('TXCUST00010', 982120009, 'Canada'), ('TXCUST00011', 982120010, 'Germany'), ('TXCUST00012', 982120011, 'France'), ('TXCUST00013', 982120012, 'India'), ('TXCUST00014', 982120013, 'Canada'), ('TXCUST00015', 982120014, 'United States'), ('TXCUST00016', 982120015, 'Australia'), ('TXCUST00017', 982120016, 'Germany'), ('TXCUST00018', 982120017, 'France'), ('TXCUST00019', 982120018, 'India'), ('TXCUST00020', 982120019, 'Canada'), ('TXCUST00021', 982120020, 'Germany'), ('TXCUST00022', 982120021, 'France'), ('TXCUST00023', 982120022, 'India'), ('TXCUST00024', 982120023, 'Canada'), ('TXCUST00025', 982120024, 'Germany'), ('TXCUST00026', 982120025, 'France'), ('TXCUST00027', 982120026, 'India'), ('TXCUST00028', 982120027, 'Canada'), ('TXCUST00029', 982120028, 'United States'), ('TXCUST00030', 982120029, 'Australia'), ('TXCUST00031', 982120030, 'Germany'), ('TXCUST00032', 982120031, 'France'), ('TXCUST00033', 982120032, 'India'), ('TXCUST00034', 982120033, 'Canada'), ('TXCUST00035', 982120034, 'Germany'), ('TXCUST00036', 982120035, 'France'), ('TXCUST00037', 982120036, 'India'), ('TXCUST00038', 982120037, 'Canada'), ('TXCUST00039', 982120038, 'Germany'), ('TXCUST00040', 982120039, 'France'), ('TXCUST00041', 982120040, 'India'), ('TXCUST00042', 982120041, 'Canada'), ('TXCUST00043', 982120042, None), ('TXCUST00044', 982120043, None), ('TXCUST00045', 982120044, None), ('TXCUST00046', 982120045, None), ('TXCUST00047', 982120046, None), ('TXCUST00048', 982120047, None), ('TXCUST00049', 982120048, None), ('TXCUST00050', 982120049, None)]
In [10]:
IRoam.count()
Out[10]:
500
Let us see if what we are doing is effectively a left join operation.¶
Checking if we have 982120042 in leftRDD "TRDD"¶
In [20]:
FilteredRDD = TRDD.filter(lambda record: record.split(",")[1] == "982120042")
FilteredRDD.collect()
Out[20]:
['TXCUST00043,982120042,Male,N,PostPaid,Active,Active,InActive,15,N']
Checking if we have 982120042 in rightRDD "IRDD"¶
In [28]:
FilteredRDD = IRDD.filter(lambda record: record.split(",")[0] == "982120042")
FilteredRDD.collect()
Out[28]:
[]
Checking if we have 982120042 in the output RDD "IRoam". Note that record in the IRoam is not a string but a tuple.¶
In [29]:
FilteredRDD = IRoam.filter(lambda record: record[1] == 982120042)
FilteredRDD.collect()
Out[29]:
[('TXCUST00043', 982120042, None)]
Yes, it is effectively doing a left join (but more efficiently as there is less data shuffling through broadcasting).¶
In [ ]: