Apart from the basic Numeric, String, Datetime etc datatypes , Spark also has ArrayType Column in Spark SQL. This Type is not limited to only Array but it includes other collections like Seq and List . Note that all the code written below is in Scala .

Spark Array Type Column

Array is a collection of fixed size data structure that stores elements of the same data type. Let’s see an example of how an ArrayType column looks like . In the below example we are storing the Age and Names of all the Employees with the same age.

 val arr = Seq((43,Array("Mark","Henry")) , (45,Array("Penny","Stewart","George")))
 val arrDF = spark.sparkContext.parallelize(arr).toDF("Age","EmployeeName")

  arrDF.show(false)
  +---+------------------------+
  |Age|EmployeeName            |
  +---+------------------------+
  |43 |[Mark, Henry]           |
  |45 |[Penny, Stewart, George]|
  +---+------------------------+

  arrDF.printSchema()
  root
  |-- Age: integer (nullable = false)
  |-- EmployeeName: array (nullable = true)
  |    |-- element: string (containsNull = true)

Here you can see that the Name column is of type Array . And when we print the dataframe we see that the Array column data is represented in a [] box with comma separated value. Now to convert each into a separate row we can use explode() function.

convert ArrayType column into Rows using explode in Spark Sql

Here we will see how we can convert each element in an Array into Rows using explode.

The syntax is as follows : Explode function is used inside withColumn [df.withColumn(“newColNm”,explode(“odlColNm”))]

 val arr = Seq((43,Array("Mark","Henry")) , (45,Array("Penny","Stewart","George")))
 val arrDF = spark.sparkContext.parallelize(arr).toDF("Age","EmployeeName")

 val explodeDF = arrDF.withColumn("EmpNm",explode(col("EmployeeName"))).drop("EmployeeName")

  explodeDF.show(false)
  +---+-------+
  |Age|EmpNm  |
  +---+-------+
  |43 |Mark   |
  |43 |Henry  |
  |45 |Penny  |
  |45 |Stewart|
  |45 |George |
  +---+-------+
  explodeDF.printSchema()
  root
  |-- Age: integer (nullable = false)
  |-- EmpNm: string (nullable = true)

convert String delimited column into ArrayType using Spark Sql

If we have a string column with some delimiter, we can convert it into an Array and then explode the data to created multiple rows.

To first convert String to Array we need to use Split() function along with withColumn. Let’s see an example below where the Employee Names are present in a comma separated string.

Syntax: df.withColumn(“newColNm” , split(col(“oldColNm”)),”delimeter”)

 val arr = Seq((43,("Mark,Henry")) , (45,("Penny,Stewart,George")))
 val arrDF = spark.sparkContext.parallelize(arr).toDF("Age","EmployeeName")

//convert string to array using split function
 val splitDF = arrDF.withColumn("EmpNmSplit",split(col("EmployeeName"),",")).drop("EmployeeName")
  splitDF.show(false)
  +---+------------------------+
  |Age|EmpNmSplit              |
  +---+------------------------+
  |43 |[Mark, Henry]           |
  |45 |[Penny, Stewart, George]|
  +---+------------------------+

//explode to get desired result
  val explodeDF = splitDF.withColumn("EmpNm",explode(col("EmpNmSplit"))).drop("EmpNmSplit")
  explodeDF.show(false)
  +---+-------+
  |Age|EmpNm  |
  +---+-------+
  |43 |Mark   |
  |43 |Henry  |
  |45 |Penny  |
  |45 |Stewart|
  |45 |George |
  +---+-------+

applying array_contains method on Array Type column

The array_contains method is used to check whether the array contains a specific element or not. If it contains then it returns true else it returns false.

Syntax: df.withColumn(“newColNm” , array_contains(“oldColNm”))

Let’s check this with an example. We will use the same dataframe which we have used before as well. We want to check if any name in the array is “Mark”.

 val arr = Seq((43,Array("Mark","Henry")) , (45,Array("Penny","Stewart","George")))
 val arrDF = spark.sparkContext.parallelize(arr).toDF("Age","EmployeeName")

 val empDF = arrDF.withColumn("EmpNmMark",array_contains(col("EmployeeName"),"Mark"))

  empDF.show(false)
+---+------------------------+---------+
|Age|EmployeeName            |EmpNmMark|
+---+------------------------+---------+
|43 |[Mark, Henry]           |true     |
|45 |[Penny, Stewart, George]|false    |
+---+------------------------+---------+

As you can see in the above result, the rows with Age 43 contains Mark hence it returned true.

Note: All the Array Function discussed below are available from Spark 2.4.0.

array_distinct method in Spark Scala

Using array_distinct() method you can remove duplicates from the Array Column. Lets see an example

  val arr = Seq(Array(1,2,3,3,4,5,6,5))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums")
  arrDF.withColumn("NumsDistinct" , array_distinct(col("Nums"))).show(false)

  +------------------------+------------------+
  |Nums                    |NumsDistinct      |
  +------------------------+------------------+
  |[1, 2, 3, 3, 4, 5, 6, 5]|[1, 2, 3, 4, 5, 6]|
  +------------------------+------------------+

array_except method in Spark Scala

Returns an array of the elements in the first array but not in the second array,without duplicates. The order of elements in the result is not determined. Lets see an example

  val arr = Seq((Array(1,2,3,4,5),Array(3,4,5,6)))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1","Nums2")
  arrDF.printSchema()
  arrDF.withColumn("Nums" , array_except(col("Nums1"),col("Nums2"))).show(false)

  +---------------+------------+------------+
  |Nums1          |Nums2       |Nums        |
  +---------------+------------+------------+
  |[1, 2, 3, 4, 5]|[3, 4, 5, 6]|[1, 2]      |
  +---------------+------------+------------+

array_intersect method in Spark Scala

Returns an array of the elements common in the given two arrays without duplicates. Let’s see an example

  val arr = Seq((Array(1,2,3,4,5),Array(3,4,5,6)))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1","Nums2")
  arrDF.printSchema()
  arrDF.withColumn("Nums" , array_intersect(col("Nums1"),col("Nums2"))).show(false)

  +---------------+------------+------------+
  |Nums1          |Nums2       |Nums        |
  +---------------+------------+------------+
  |[1, 2, 3, 4, 5]|[3, 4, 5, 6]|[3, 4, 5]   |
  +---------------+------------+------------+

array_min/max method in Spark Scala

array_min returns minimum value in Array. array_max returns maximum value in Array. Lets see example of both.

val arr = Seq(Array(1,2,3,4,5))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("NumsMin" , array_min(col("Nums1"))).show(false)
  +---------------+-------+
  |Nums1          |NumsMin|
  +---------------+-------+
  |[1, 2, 3, 4, 5]|1      |
  +---------------+-------+
  arrDF.withColumn("NumsMax" , array_max(col("Nums1"))).show(false)
  +---------------+-------+
  |Nums1          |NumsMax|
  +---------------+-------+
  |[1, 2, 3, 4, 5]|5      |
  +---------------+-------+

array_remove method in Spark Scala

Remove all elements that equal to element from the given array. Lets see an example

val arr = Seq(Array(1,2,3,4,5,3))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("Nums" , array_remove(col("Nums1"),3)).show(false)

  +------------------+------------+
  |Nums1             |Nums        |
  +------------------+------------+
  |[1, 2, 3, 4, 5, 3]|[1, 2, 4, 5]|
  +------------------+------------+

array_sort method in Spark Scala

Sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.

  val arr = Seq(Array(1,2,3,4,5,3))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("Nums" , array_sort(col("Nums1"))).show(false)

  +------------------+------------------+
  |Nums1             |Nums              |
  +------------------+------------------+
  |[1, 2, 3, 4, 5, 3]|[1, 2, 3, 3, 4, 5]|
  +------------------+------------------+

array_position method in Spark Scala

Locates the position of the first occurrence of the value in the given array as long. Returns null if either of the arguments are null.

Note: The position is not zero based, but 1 based index. Returns 0 if value could not be found in array.

  val arr = Seq(Array(1,6,3,5,8))
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("NumsPosition" , array_position(col("Nums1"),5)).show(false)

  +---------------+------------+
  |Nums1          |NumsPosition|
  +---------------+------------+
  |[1, 6, 3, 5, 8]|4           |
  +---------------+------------+

array_repeat method in Spark Scala

Creates an array containing the left argument repeated the number of times given by the right argument.

  val arr = Seq(3)
  val arrDF = spark.sparkContext.parallelize(arr).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("NewArray" , array_repeat(col("Nums1"),5)).show(false)

  +-----+---------------+
  |Nums1|NewArray       |
  +-----+---------------+
  |3    |[3, 3, 3, 3, 3]|
  +-----+---------------+

array_union method in Spark Scala

Returns an array of the elements in the union of the given two arrays, without duplicates.

  val arr1 = Seq((Array(1,2,3),Array(3,4,5)))
  val arrDF = spark.sparkContext.parallelize(arr1).toDF("Nums1","Nums2")
  arrDF.printSchema()
  arrDF.withColumn("NewArray" , array_union(col("Nums1"),col("Nums2"))).show(false)

  +---------+---------+---------------+
  |Nums1    |Nums2    |NewArray       |
  +---------+---------+---------------+
  |[1, 2, 3]|[3, 4, 5]|[1, 2, 3, 4, 5]|
  +---------+---------+---------------+

array_join method in Spark Scala

Concatenates the elements of column using the delimiter. It concatenates and returns a string. Null values are replaced with nullReplacement.

  val arr1 = Seq(Array("Hi",null,"Hello"))
  val arrDF = spark.sparkContext.parallelize(arr1).toDF("Nums1")
  arrDF.printSchema()
  arrDF.withColumn("NewArray" , array_join(col("Nums1"),",","nothing")).show(false)

  +------------+----------------+
  |Nums1       |NewArray        |
  +------------+----------------+
  |[Hi,, Hello]|Hi,nothing,Hello|
  +------------+----------------+

array_overlap method in Spark Scala

Returns true if a1 and a2 have at least one non-null element in common. If not and both the arrays are non-empty and any of them contains a null, it returns null. It returns false otherwise.

  val arr1 = Seq((Array("Hi","Hello"),Array("Bye","Hi")))
  val arrDF = spark.sparkContext.parallelize(arr1).toDF("Nums1","Nums2")
  arrDF.printSchema()
  arrDF.withColumn("NewArray" , arrays_overlap(col("Nums1"),col("Nums2"))).show(false)

  +-----------+---------+--------+
  |Nums1      |Nums2    |NewArray|
  +-----------+---------+--------+
  |[Hi, Hello]|[Bye, Hi]|true    |
  +-----------+---------+--------+

Conclusion

So today we learnt about ArrayType complex data type in Spark. Also we understood how to use various array functions .

If you want to check the articles written on spark performance click here

🙂 kudos for learning something new 🙂

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Discover more from UnderstandingBigData

Subscribe now to keep reading and get access to the full archive.

Continue reading