Repartition in Spark does a full shuffle of data and splits the data into chunks based on user input. Using this we can increase or decrease the number of partitions. There are three ways in which Spark can repartition the data, we will see examples of all. But first, let us understand why do we even need to repartition?

why partition data

Before we understand the need for repartition let’s quickly understand what are spark partitions. Let’s say we have 1GB of data and it takes 20 mins to process them in one node. But if we break this into 4 files 256MB each and send it to 4 nodes, then the total time taken is 5 mins. Here we break the data into 4 partitions which increases parallelism. Hence you can say that partition is the atomic chunk of data in Spark.

The next question that should come to your mind is how did we break the 1GB data into 4 files of 256MB each. Here comes the role of repartition. Using Repartition we can break the dataframe into as many partitions as we want. We can both increase or decrease the number of partitions. There are various repartition methods let’s check all of them below.

repartition by integer

This returns a dataset with exactly as many partitions as mentioned in the integer value. Repartition by integer uses round robin partition. Let’s see an example of how to repartition a dataframe.

def repartition(numPartitions: Int): Dataset[T]

Below we create 3 dataframes having a Number column having 1 to 1000 values and a Part Column having Part1/2/3 as values.

  val num1: Seq[Int] = (1 to 1000)
  val df1 = spark.sparkContext.parallelize(num1,2).toDF("Number").withColumn("Part",lit("Part1"))
  val num2: Seq[Int] = (1 to 1000)
  val df2 = spark.sparkContext.parallelize(num2).toDF("Number").withColumn("Part",lit("Part2"))
  val num3: Seq[Int] = (1 to 1000)
  val df3 = spark.sparkContext.parallelize(num3).toDF("Number").withColumn("Part",lit("Part3"))
  val df = df1.union(df2).union(df3)
  println(df.rdd.getNumPartitions) //4
  println(df.repartition(10).rdd.getNumPartitions) //10

getNumPartitions method provides the number of partitions of the existing RDD. As you can see in the above example the initial number of partitions is 4. Now if we want to increase the partition to 10 we do it using df.repartition(10) and when you call getNumPartitions on top of it you see we have 10 partitions.

We can also reduce the number of partitions. Let’s see how we can reduce it to 1 partition.

println(df.repartition(1).rdd.getNumPartitions) //1

repartition by column name

This returns a new Dataset partitioned by the given partitioning column, using spark.sql.shuffle.partitions as the number of partitions. The resulting Dataset is hash partitioned. This is the same operation as “DISTRIBUTE BY” in SQL (Hive QL). Here partitioning happens on the specified column but the number of partitions is decided by the default spark shuffle partition.

Here you will have 2 questions :

  • First, how to find out spark default shuffle partition

By default, the number of spark shuffle partitions is 200.  If we want to change this we can do so by spark.conf.set(“spark.sql.shuffle.partitions”,50) .

  • Second, what is hash partition on column key

The hash partition distributes the data evenly into many partitions using the column as a key. So what happens is each row in the column gets a hash code which is then distributed into partitions using the hashing algorithm. Here each partition has almost the same size.

Let’s see an example:

  //Using the same dataframe created in first example
  val df = df1.union(df2).union(df3)
  println(df.repartition(col("Part")).rdd.getNumPartitions) //200
  spark.conf.set("spark.sql.shuffle.partitions",50)
  println(df.repartition(col("Part")).rdd.getNumPartitions) //50

repartition by integer and column name

This is similar to repartition using column, but instead of spark shuffle partition being the deciding factor for the number of partitions, here the integer value decides how many partitions should be created. Here also hash partition is used.

Lets see an example:

  //Using the same dataframe created in first example
  val df = df1.union(df2).union(df3)
  println(df.repartition(20,col("Part")).rdd.getNumPartitions) //20

things to remember

  • Repartition triggers a full shuffle.
  • As the entire data is shuffled repartition is a costly operation and should be used judiciously.
  • Unlike Coalesce, Repartition can be used to both increase and decrease the number of partitions.
  • Repartition by integer triggers a round-robin partition.
  • Repartition by column and [int && column] triggers a hash partition.

Conclusion

So today we learned what are spark partitions and how to repartition an rdd/dataframe. We also saw various repartition methods.

🙂 kudos for learning something new 🙂

You can check out my other Spark tutorials here.

2 thoughts on “Repartition in SPARK

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Discover more from UnderstandingBigData

Subscribe now to keep reading and get access to the full archive.

Continue reading