Broadcast variable helps the programmer to keep a read only copy of the variable in each machine/node where Spark is executing its job. The variable is converted to serializable form so that it can be sent over the network, and before it is used it needs to be desterilized. Also, remember that the value of broadcast variable is read only and we cannot change the value once it is created.
Create a Broadcast Variable
Here we will see how we can create and fetch the value from a broadcast variable . To fetch we need to use value method. We will create broadcast variable of an Integer , List and Map.
Syntax to Create: SparkContext.broadcast(v) or Spark.SparkContext.broadcast(v)
Syntax to Fetch : broadcastVariable.value()
//Broadcast an Integer val int1: Int = 3 val broadInt = spark.sparkContext.broadcast(int1) println(broadInt.value) //output:3 //Broadcast a List val lst1: Seq[Int] = List(1,2,3) val broadList = spark.sparkContext.broadcast(lst1) println(broadList.value(0)) //output:1 println(broadList.value(1)) //output:2 //Broadcast a Map val map1 = Map(1->"one",2->"two") val broadMap = spark.sparkContext.broadcast(map1) println(broadMap.value(1)) //output:one println(broadMap.value(2)) //output:two
When to use Broadcast Variable
If broadcast variable is so effective shouldn’t we convert every variable into a broadcast variable. The short answer is NO. To understand this lets first recap what we know about Job / Stage / Task . A Job consists of 1 or more Stage which is split into 1 or more Tasks. Spark automatically broadcasts the data that is needed by the Tasks inside each Stage. So if a Job has only one Stage then broadcasting the variables may not be useful. But lets say that there are multiple stages in the job and the tasks inside multiple stages need to use the same Variable , in such case we should broadcast the variable.
Lets say a Job has 3 wide transformation, hence 3 Stages will be created. Remember that Stages are created every time there is shuffle operation. We have a variable which will be used by Tasks in all the 3 Stages. In normal scenario during each Stage execution the variables will be automatically broadcasted each time, which means high network IO . If we convert this to a broadcast variable, then this will move to the node once and will be used during all the 3 Stage execution. This reduces network IO.
unpersist Spark Broadcast Variable
By now we know that a variable is broadcasted it is copied to the executors. When we unpersist the broadcast variable it is removed from all the executors. If the variable is used again then it is broadcasted again to all executors. Lets see an example.
val int1: Int = 3 val broadInt = spark.sparkContext.broadcast(int1) println(broadInt.value) //variable is sent to executor broadInt.unpersist() // variable is removed from all executors println(broadInt.value) //variable is sent from driver to executor again
destroy Spark Broadcast Variable
Using destroy we can remove the broadcast variable from both executor and driver. If we try to access the variable again after destroying our Jobs will throw error. Lets see an example
val int1: Int = 3 val broadInt = spark.sparkContext.broadcast(int1) println(broadInt.value) //variable is sent to executor broadInt.destroy() // variable is removed from both driver and executors println(broadInt.value) //Error: SparkException: Attempted to use Broadcast(0) after it was destroyed
So today we understood what is Spark Broadcast Variable. When to use and when not to use this and how to unpersist and destroy the variable.
You can check my other blog posts here.
🙂 kudos for learning something new 🙂