Using Spark filter function you can retrieve records from the Dataframe or Datasets which satisfy a given condition. People from SQL background can also use where() . If you are comfortable in Scala its easier for you to remember filter() and if you are comfortable in SQL its easier of you to remember where(). No matter which you use both work in the exact same manner. Also note that filter() is a transformation and not action. And as it does not shuffle data it is a Narrow Transformation.
Lets check the various syntax for Filters and how to use them.
Table of Contents
Using Dataframe Filter Function on Column Instance
When the column you want to filter is an instance of Column class ex: ‘column / $(“column”) / col(“column”) then you use filter as show below.You can learn about different column types here.
df.filter( ‘col <comparision operator> value )
val dfTN = Seq(("Smith",45),("Andy",77)).toDF("Name","Marks"); dfTN.filter('Name === "Andy").show +----+-----+ |Name|Marks| +----+-----+ |Andy| 77| +----+-----+
If you want to provide filter on multiple columns then you can do it using AND(&&) or OR(||) . You can use filter() function multiple times to achieve the same. Lets check it with an example
val dfTN = Seq(("Smith",45),("Andy",77)).toDF("Name","Marks"); dfTN.filter('Name === "Andy" && 'Marks > "50").show dfTN.filter('Name === "Andy" || 'Marks > "50").show dfTN.filter('Name === "Andy").filter('Marks > "50").show +----+-----+ |Name|Marks| +----+-----+ |Andy| 77| +----+-----+
Using Filter Function with SQL like expressions
In the previous chapter we had used selectExpr in which we can select column using SQL like expressions. Here we will learn to use filter function using SQL like expressions, this will be easier to use from people with SQL background.
df.filter( “col <comparision operator> value ” ) The entire expressions is within double quotes.
val dfTN = Seq(("Smith",45),("Andy",77)).toDF("Name","Marks"); dfTN.filter("Marks > 50").show +----+-----+ |Name|Marks| +----+-----+ |Andy| 77| +----+-----+
Filter on Column Alias
Using Spark Filter function we can filter on column alias as well. One thing to keep in mind is while using filter on column alias you should have the filter always after the select. Lets check it with an example.
val dfTN = Seq(("Smith" , 50),("Divya" , 56)).toDF("Name","Marks") //This give result dfTN.select( 'Name , ('Marks - 5).as("NewMarks") ).filter('NewMarks > 50).show() +-----+--------+ | Name|NewMarks| +-----+--------+ |Divya| 51| +-----+--------+ //This throws error dfTN.filter('NewMarks > 50).select( 'Name , ('Marks - 5).as("NewMarks") ).show() org.apache.spark.sql.AnalysisException: cannot resolve '`NewMarks`' given input columns: [Name, Marks];
SPARK Filter Function Internals
A common question that comes to mind is , when we apply a filter on a Table/Dataframe/Dataset (i)does the complete data gets fetched and then filter is applied or (ii)the filter is applied as the data is fetched from the dataframe.
To understand this we need to understand Predicate Pushdown in Spark. Simply put Spark tries to push the filter condition to the data source. It does so to fetch only the required rows into the RAM hence reducing the data size and network bandwidth usage.
Lets check this with an example.
scala> val student = Seq((1, "Smith", 23), (2, "Maven", 24), (3, "Anu", 24)).toDF("id", "student_name", "age") student: org.apache.spark.sql.DataFrame = [id: int, student_name: string ... 1 more field] scala> val subject = Seq((1, "Chemistry"), (2, "Math"), ( 3, "Science")).toDF("stu_id", "Subject") subject: org.apache.spark.sql.DataFrame = [stu_id: int, Subject: string] scala> student.createOrReplaceTempView("student") scala> subject.createOrReplaceTempView("subject") scala> spark.sql("""Select stu.id,stu.student_name,stu.age from student stu join subject sub on stu.id=sub.stu_id where stu.age > 23 and sub.Subject="Math" """).show +---+------------+---+ | id|student_name|age| +---+------------+---+ | 2| Maven| 24| +---+------------+---+
We have created 2 tables “Student” and “Subject” and then we have joined them on “id” column and also applied the filters. So, what should happen, (i)should all the data be fetched and joined and then filter is applied or (ii)is the filter first applied on individual tables and then they are joined. Let’s check logical and physical plan.
== Parsed Logical Plan ==
‘Project [‘stu.id, ‘stu.student_name, ‘stu.age]
+- ‘Filter ((‘stu.age > 23) && (‘sub.Subject = Math))
+- ‘Join Inner, (‘stu.id = ‘sub.stu_id)
:- ‘SubqueryAlias stu
: +- ‘UnresolvedRelation student
+- ‘SubqueryAlias sub
+- ‘UnresolvedRelation subject
== Physical Plan ==
*(2) Project [id#568, student_name#569, age#570]
+- *(2) BroadcastHashJoin [id#568], [stu_id#579], Inner, BuildRight
:- *(2) Project [_1#564 AS id#568, _2#565 AS student_name#569, _3#566 AS age#570]
: +- *(2) Filter (_3#566 > 23)
: +- LocalTableScan [_1#564, _2#565, _3#566]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Project [_1#576 AS stu_id#579]
+- *(1) Filter (isnotnull(_2#577) && (_2#577 = Math))
+- LocalTableScan [_1#576, _2#577]
Before i explain the plan remember that the plans are read bottom up. If you check the Logical Plan you see that first the Join is done and then we have the Filter. But by the time we reach Physical Plan we see that the filters are pushed to individual tables. This is the work of Catalyst Optimizer and this is known as Spark Predicate Pushdown.
Conclusion
So today we learnt
- What is Spark Filter Function.
- How to use filter function on Spark when column is instance of column class.
- Use SQL like expressions in filter function.
- Spark Filter Internals
- Using filter on column alias.
🙂 kudos for learning something new 🙂
1 thought on “SPARK FILTER FUNCTION”