In this blog, we will understand how to join 2 or more Dataframes in Spark. Inner Join in Spark works exactly like joins in SQL. If you are unfamiliar with what join is, it is used to combine rows from two or more dataframes, based on a related column between them. Inner Join returns records that have matching values in both dataframes/tables. Inner Join is the default join in Spark.
Before we check a few examples let’s create 2 dataframes that will be used to explain various join methods.
val EmpDF = Seq(("C1","S1","Mark","Henry"),("C2","S2","Jonathan","Phil"),("C3","S3","Tabi","Olonga")) .toDF("CtryID" , "EmpID" , "FirstNm" , "LastNm") val SalDF = Seq(("C1","S1",2000),("C2","S2",3000)).toDF("CtryID" , "EmpID" , "Salary") EmpDF.show +------+-----+--------+------+ |CtryID|EmpID| FirstNm|LastNm| +------+-----+--------+------+ | C1| S1| Mark| Henry| | C2| S2|Jonathan| Phil| | C3| S3| Tabi|Olonga| +------+-----+--------+------+ SalDF.show +------+-----+------+ |CtryID|EmpID|Salary| +------+-----+------+ | C1| S1| 2000| | C2| S2| 3000| +------+-----+------+
Table of Contents
join tables using single join column
Here we will see how we can join 2 dataframes using a single join column. We will join the EmpDF and SalDF dataframes on EmpID column. The syntax is as below
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
EmpDF.join(SalDF, EmpDF("EmpID") === SalDF("EmpID") , "inner" ).show() +------+-----+--------+------+------+-----+------+ |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary| +------+-----+--------+------+------+-----+------+ | C1| S1| Mark| Henry| C1| S1| 2000| | C2| S2|Jonathan| Phil| C2| S2| 3000| +------+-----+--------+------+------+-----+------+
join tables using multiple join columns
Here we will see how we can join 2 dataframes using multiple join columns. For our examples, we will join using the CtryID and EmpID columns.
We can provide multiple join columns separated by &&. Let’s see an example
EmpDF.join(SalDF, EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID"), "inner" ).show() +------+-----+--------+------+------+-----+------+ |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary| +------+-----+--------+------+------+-----+------+ | C1| S1| Mark| Henry| C1| S1| 2000| | C2| S2|Jonathan| Phil| C2| S2| 3000| +------+-----+--------+------+------+-----+------+
pass join columns in SEQ
If the column names on which we want to join the 2 dataframes are the same, then we can pass the column names in a Seq. In the dataframes we have created earlier we see that the join column names CtryID and EmpID are the same. Hence we can use join condition as below
Syntax: Seq("Col1","Col2".....,"Coln")
EmpDF.join(SalDF, Seq("EmpID","CtryID"), "inner" ).show() +-----+------+--------+------+------+ |EmpID|CtryID| FirstNm|LastNm|Salary| +-----+------+--------+------+------+ | S1| C1| Mark| Henry| 2000| | S2| C2|Jonathan| Phil| 3000| +-----+------+--------+------+------+
inner join using spark sql expression
If you are familiar with SQL you would prefer to write most of your code inside spark.sql(). Let’s see an example
EmpDF.createOrReplaceTempView("EMP_TAB") SalDF.createOrReplaceTempView("SAL_TAB") spark.sql("Select * from EMP_TAB A join SAL_TAB B on A.empid=B.empid and a.ctryid=b.ctryid").show() +------+-----+--------+------+------+-----+------+ |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary| +------+-----+--------+------+------+-----+------+ | C1| S1| Mark| Henry| C1| S1| 2000| | C2| S2|Jonathan| Phil| C2| S2| 3000| +------+-----+--------+------+------+-----+------+
using where or filter to provide join condition
The join conditions can be provided in the filter or where conditions as well. Let’s see an example.
Syntax: df1.join(df2).where(join_conditions)
EmpDF.join(SalDF).where(EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID")).show() +------+-----+--------+------+------+-----+------+ |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary| +------+-----+--------+------+------+-----+------+ | C1| S1| Mark| Henry| C1| S1| 2000| | C2| S2|Jonathan| Phil| C2| S2| 3000| +------+-----+--------+------+------+-----+------+ EmpDF.join(SalDF).filter(EmpDF("EmpID") === SalDF("EmpID") && EmpDF("CtryID") === SalDF("CtryID")).show() +------+-----+--------+------+------+-----+------+ |CtryID|EmpID| FirstNm|LastNm|CtryID|EmpID|Salary| +------+-----+--------+------+------+-----+------+ | C1| S1| Mark| Henry| C1| S1| 2000| | C2| S2|Jonathan| Phil| C2| S2| 3000| +------+-----+--------+------+------+-----+------+
If you want to learn more about Spark Filter Function you can check out this blog.
🙂 kudos for learning something new 🙂