In this blog, we will understand how to join 2 or more Dataframes in Spark. Inner Join in Spark works exactly like joins in SQL. If you are unfamiliar with what join is, it is used to combine rows from two or more dataframes, based on a related column between them. Inner Join returns records that have matching values in both dataframes/tables. Inner Join is the default join in Spark.

Before we check a few examples let’s create 2 dataframes that will be used to explain various join methods.

  val EmpDF = Seq(("C1","S1","Mark","Henry"),("C2","S2","Jonathan","Phil"),("C3","S3","Tabi","Olonga"))
                 .toDF("CtryID" , "EmpID" , "FirstNm" , "LastNm")
  
  val SalDF = Seq(("C1","S1",2000),("C2","S2",3000)).toDF("CtryID" , "EmpID" , "Salary")
  EmpDF.show
  +------+-----+--------+------+
  |CtryID|EmpID| FirstNm|LastNm|
  +------+-----+--------+------+
  |    C1|   S1|    Mark| Henry|
  |    C2|   S2|Jonathan|  Phil|
  |    C3|   S3|    Tabi|Olonga|
  +------+-----+--------+------+
  SalDF.show
  +------+-----+------+
  |CtryID|EmpID|Salary|
  +------+-----+------+
  |    C1|   S1|  2000|
  |    C2|   S2|  3000|
  +------+-----+------+

join tables using single join column

Here we will see how we can join 2 dataframes using a single join column. We will join the EmpDF and SalDF dataframes on EmpID column. The syntax is as below

join(right: Dataset[_], joinExprs: Column): DataFrame

join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

  EmpDF.join(SalDF, EmpDF("EmpID") === SalDF("EmpID") , "inner" ).show()
  +------+-----+--------+------+------+-----+------+
  |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary|
  +------+-----+--------+------+------+-----+------+
  |    C1|   S1|    Mark| Henry|    C1|   S1|  2000|
  |    C2|   S2|Jonathan|  Phil|    C2|   S2|  3000|
  +------+-----+--------+------+------+-----+------+

join tables using multiple join columns

Here we will see how we can join 2 dataframes using multiple join columns. For our examples, we will join using the CtryID and EmpID columns.

We can provide multiple join columns separated by &&. Let’s see an example

  EmpDF.join(SalDF, EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID"), "inner" ).show()
  +------+-----+--------+------+------+-----+------+
  |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary|
  +------+-----+--------+------+------+-----+------+
  |    C1|   S1|    Mark| Henry|    C1|   S1|  2000|
  |    C2|   S2|Jonathan|  Phil|    C2|   S2|  3000|
  +------+-----+--------+------+------+-----+------+

pass join columns in SEQ

If the column names on which we want to join the 2 dataframes are the same, then we can pass the column names in a Seq. In the dataframes we have created earlier we see that the join column names CtryID and EmpID are the same. Hence we can use join condition as below

Syntax: Seq("Col1","Col2".....,"Coln")
  EmpDF.join(SalDF, Seq("EmpID","CtryID"), "inner" ).show()
  +-----+------+--------+------+------+
  |EmpID|CtryID| FirstNm|LastNm|Salary|
  +-----+------+--------+------+------+
  |   S1|    C1|    Mark| Henry|  2000|
  |   S2|    C2|Jonathan|  Phil|  3000|
  +-----+------+--------+------+------+

inner join using spark sql expression

If you are familiar with SQL you would prefer to write most of your code inside spark.sql(). Let’s see an example

  EmpDF.createOrReplaceTempView("EMP_TAB")
  SalDF.createOrReplaceTempView("SAL_TAB")
  spark.sql("Select * from EMP_TAB A join SAL_TAB B on A.empid=B.empid and a.ctryid=b.ctryid").show()
  +------+-----+--------+------+------+-----+------+
  |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary|
  +------+-----+--------+------+------+-----+------+
  |    C1|   S1|    Mark| Henry|    C1|   S1|  2000|
  |    C2|   S2|Jonathan|  Phil|    C2|   S2|  3000|
  +------+-----+--------+------+------+-----+------+

using where or filter to provide join condition

The join conditions can be provided in the filter or where conditions as well. Let’s see an example.

Syntax: df1.join(df2).where(join_conditions)

  EmpDF.join(SalDF).where(EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID")).show()
  +------+-----+--------+------+------+-----+------+
  |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary|
  +------+-----+--------+------+------+-----+------+
  |    C1|   S1|    Mark| Henry|    C1|   S1|  2000|
  |    C2|   S2|Jonathan|  Phil|    C2|   S2|  3000|
  +------+-----+--------+------+------+-----+------+
  EmpDF.join(SalDF).filter(EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID")).show()
  +------+-----+--------+------+------+-----+------+
  |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary|
  +------+-----+--------+------+------+-----+------+
  |    C1|   S1|    Mark| Henry|    C1|   S1|  2000|
  |    C2|   S2|Jonathan|  Phil|    C2|   S2|  3000|
  +------+-----+--------+------+------+-----+------+

If you want to learn more about Spark Filter Function you can check out this blog.

🙂 kudos for learning something new 🙂

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Discover more from UnderstandingBigData

Subscribe now to keep reading and get access to the full archive.

Continue reading