Broadcast variable helps the programmer to keep a read only copy of the variable in each machine/node where Spark is executing its job. The variable is converted to serializable form so that it can be sent over the network, and before it is used it needs to be desterilized. Also, remember that the value of broadcast variable is read only and we cannot change the value once it is created.

Create a Broadcast Variable

Here we will see how we can create and fetch the value from a broadcast variable . To fetch we need to use value method. We will create broadcast variable of an Integer , List and Map.

Syntax to Create: SparkContext.broadcast(v) or Spark.SparkContext.broadcast(v)

Syntax to Fetch : broadcastVariable.value()

  //Broadcast an Integer
  val int1: Int = 3
  val broadInt = spark.sparkContext.broadcast(int1)
  println(broadInt.value) //output:3
  

  //Broadcast a List
  val lst1: Seq[Int] = List(1,2,3)
  val broadList = spark.sparkContext.broadcast(lst1)
  println(broadList.value(0)) //output:1
  println(broadList.value(1)) //output:2

  //Broadcast a Map
  val map1 = Map(1->"one",2->"two")
  val broadMap = spark.sparkContext.broadcast(map1)
  println(broadMap.value(1)) //output:one
  println(broadMap.value(2)) //output:two

When to use Broadcast Variable

If broadcast variable is so effective shouldn’t we convert every variable into a broadcast variable. The short answer is NO. To understand this lets first recap what we know about Job / Stage / Task . A Job consists of 1 or more Stage which is split into 1 or more Tasks. Spark automatically broadcasts the data that is needed by the Tasks inside each Stage. So if a Job has only one Stage then broadcasting the variables may not be useful. But lets say that there are multiple stages in the job and the tasks inside multiple stages need to use the same Variable , in such case we should broadcast the variable.

Lets say a Job has 3 wide transformation, hence 3 Stages will be created. Remember that Stages are created every time there is shuffle operation. We have a variable which will be used by Tasks in all the 3 Stages. In normal scenario during each Stage execution the variables will be automatically broadcasted each time, which means high network IO . If we convert this to a broadcast variable, then this will move to the node once and will be used during all the 3 Stage execution. This reduces network IO.

unpersist Spark Broadcast Variable

By now we know that a variable is broadcasted it is copied to the executors. When we unpersist the broadcast variable it is removed from all the executors. If the variable is used again then it is broadcasted again to all executors. Lets see an example.

  val int1: Int = 3
  val broadInt = spark.sparkContext.broadcast(int1)
  println(broadInt.value) //variable is sent to executor

  broadInt.unpersist() // variable is removed from all executors

  println(broadInt.value) //variable is sent from driver to executor again

destroy Spark Broadcast Variable

Using destroy we can remove the broadcast variable from both executor and driver. If we try to access the variable again after destroying our Jobs will throw error. Lets see an example

  val int1: Int = 3
  val broadInt = spark.sparkContext.broadcast(int1)
  println(broadInt.value) //variable is sent to executor

  broadInt.destroy() // variable is removed from both driver and  executors

  println(broadInt.value) //Error: SparkException: Attempted to use Broadcast(0) after it was destroyed 

Conclusion

So today we understood what is Spark Broadcast Variable. When to use and when not to use this and how to unpersist and destroy the variable.

You can check my other blog posts here.

🙂 kudos for learning something new 🙂

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Discover more from UnderstandingBigData

Subscribe now to keep reading and get access to the full archive.

Continue reading