PySpark Explode Nested Array, Array or Map to rows

pyspark explode nest array or map

PySpark Explode : In this tutorial, we will learn how to explode and flatten columns of a dataframe pyspark using the different functions available in Pyspark.

Introduction

When working on PySpark, we often use semi-structured data such as JSON or XML files. These file types can contain arrays or map elements. They can therefore be difficult to process in a single row or column. The explode() function present in Pyspark allows this processing and allows to better understand this type of data. This function returns a new row for each element of the table or map. It also allows, if desired, to create a new row for each key-value pair of a structure map.

This tutorial will explain how to use the following Pyspark functions:

To start we will need to create a dataframe. In our example, our dataframe will be composed of 4 columns:

pokemon_name: Contains the name of the pokemon
evolves: This column contains the list of the evolutions of each pokémon, it is presented in the form of a nested array.
japanese_english_name: This is a array that contains the Japanese and French translation of pokemon names.
types : This column contains the primary and secondary type for each pokemon. Its type is a map.

Here is the code to create the dataframe mentioned above:


from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
spark = SparkSession.builder.appName('pyspark - example explode()').getOrCreate()
sc = spark.sparkContext

database_poke = [
  ("Bulbasaur",[["Ivysaur","2"],["Venusaur","3"]],['Fushigidane','Bulbizarre'],{'Primary_type':'Grass','Secondary_type':'Poison'}),
  ("Charmander",[["Charmeleon","5"],["Charizard","6"]],['Hitokage','Salameche'],{'Primary_type':'Fire','Secondary_type':'Fire'}),
  ("Squirtle",[["Wartortle","8"],["Blastoise","9"]],['Zenigame','Carapuce'],{'Primary_type':'Water','Secondary_type':'Water'}),
  ("Caterpie",[["Metapod","11"],["Butterfree","12"]],['Kyatapī','Chenipan'],{'Primary_type':'Bug','Secondary_type':'Bug'})
    
]

df = spark.createDataFrame(data=database_poke, schema = ['pokemon_name','evolves','japanese_french_name','types'])
df.printSchema()
df.show(4,truncate=False)

With the function printschema() and show(), we see that the dataframe contains the following lines:

root
 |-- pokemon_name: string (nullable = true)
 |-- evolves: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- japanese_french_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- types: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+------------+---------------------------------+-------------------------+-------------------------------------------------+
|pokemon_name|evolves                          |japanese_french_name     |types                                            |
+------------+---------------------------------+-------------------------+-------------------------------------------------+
|Bulbasaur   |[[Ivysaur, 2], [Venusaur, 3]]    |[Fushigidane, Bulbizarre]|[Secondary_type -> Poison, Primary_type -> Grass]|
|Charmander  |[[Charmeleon, 5], [Charizard, 6]]|[Hitokage, Salameche]    |[Secondary_type -> Fire, Primary_type -> Fire]   |
|Squirtle    |[[Wartortle, 8], [Blastoise, 9]] |[Zenigame, Carapuce]     |[Secondary_type -> Water, Primary_type -> Water] |
|Caterpie    |[[Metapod, 11], [Butterfree, 12]]|[Kyatapī, Chenipan]      |[Secondary_type -> Bug, Primary_type -> Bug]     |
+------------+---------------------------------+-------------------------+-------------------------------------------------+

PySpark Explode Nested Array Column to Rows

In this first part, we are going to explode a column containing a nested array.

The explode() function is available in the pyspark.sql.functions module, so you have to import this module :

# Import module

from pyspark.sql.functions import explode

Now we can use this function to explode our “evolves” column:

# Explode() function

from pyspark.sql.functions import explode
df.select(df.pokemon_name,explode(df.evolves)).show(truncate=False)

This has the effect of producing the following result:

+------------+----------------+
|pokemon_name|col             |
+------------+----------------+
|Bulbasaur   |[Ivysaur, 2]    |
|Bulbasaur   |[Venusaur, 3]   |
|Charmander  |[Charmeleon, 5] |
|Charmander  |[Charizard, 6]  |
|Squirtle    |[Wartortle, 8]  |
|Squirtle    |[Blastoise, 9]  |
|Caterpie    |[Metapod, 11]   |
|Caterpie    |[Butterfree, 12]|
+------------+----------------+

The function create a new column called “col” and allowed us to create new rows for each element of our nested array.

PySpark Explode Array or Map Column to Rows

Previously we have shown that it is possible to explode a nested array but also possible to explode a column containing a array or a map over several rows. By default, null values are ignored and will not create new rows.

Explode Array Column

When an array is passed as a parameter to the explode() function, the explode() function will create a new column called “col” by default which will contain all the elements of the array.

# Explode Array Column

from pyspark.sql.functions import explode

df.select(df.pokemon_name,explode(df.japanese_french_name)).show(truncate=False)
+------------+----------------+
|pokemon_name|col             |
+------------+----------------+
|Bulbasaur   |[Ivysaur, 2]    |
|Bulbasaur   |[Venusaur, 3]   |
|Charmander  |[Charmeleon, 5] |
|Charmander  |[Charizard, 6]  |
|Squirtle    |[Wartortle, 8]  |
|Squirtle    |[Blastoise, 9]  |
|Caterpie    |[Metapod, 11]   |
|Caterpie    |[Butterfree, 12]|
+------------+----------------+

Explode Map Column

When a map is passed as a parameter of the explode() function, it will create two new columns (one for the key and one for value).

# Explode Map Column

from pyspark.sql.functions import explode

df.select(df.pokemon_name,explode(df.types)).show(truncate=False)
+------------+--------------+------+
|pokemon_name|key           |value |
+------------+--------------+------+
|Bulbasaur   |Secondary_type|Poison|
|Bulbasaur   |Primary_type  |Grass |
|Charmander  |Secondary_type|Fire  |
|Charmander  |Primary_type  |Fire  |
|Squirtle    |Secondary_type|Water |
|Squirtle    |Primary_type  |Water |
|Caterpie    |Secondary_type|Bug   |
|Caterpie    |Primary_type  |Bug   |
+------------+--------------+------+

PySpark Explode : explode_outer(), posexplode(), posexplode_outer() 

PySpark SQL explode_outer() Function

This function is used to create a row for each element of the array or map. It returns null if the array or map is null or empty.

from pyspark.sql.functions import explode_outer

df.select(df.pokemon_name,explode_outer(df.types)).show()
+------------+--------------+------+
|pokemon_name|           key| value|
+------------+--------------+------+
|   Bulbasaur|Secondary_type|Poison|
|   Bulbasaur|  Primary_type| Grass|
|  Charmander|Secondary_type|  Fire|
|  Charmander|  Primary_type|  Fire|
|    Squirtle|Secondary_type| Water|
|    Squirtle|  Primary_type| Water|
|    Caterpie|Secondary_type|   Bug|
|    Caterpie|  Primary_type|   Bug|
+------------+--------------+------+

It works the same way for a array.

PySpark SQL posexplode() Function

We have seen that the explode() function returned one column for arrays and two columns for maps. With the function posexplode(), it will add a new column “pos” which contains the position of the element in the array or map.

# Get the position of each element

from pyspark.sql.functions import posexplode
df.select(df.pokemon_name,posexplode(df.japanese_french_name)).show()
df.select(df.pokemon_name,posexplode(df.types)).show()
+------------+---+-----------+
|pokemon_name|pos|        col|
+------------+---+-----------+
|   Bulbasaur|  0|Fushigidane|
|   Bulbasaur|  1| Bulbizarre|
|  Charmander|  0|   Hitokage|
|  Charmander|  1|  Salameche|
|    Squirtle|  0|   Zenigame|
|    Squirtle|  1|   Carapuce|
|    Caterpie|  0|    Kyatapī|
|    Caterpie|  1|   Chenipan|
+------------+---+-----------+

+------------+---+--------------+------+
|pokemon_name|pos|           key| value|
+------------+---+--------------+------+
|   Bulbasaur|  0|Secondary_type|Poison|
|   Bulbasaur|  1|  Primary_type| Grass|
|  Charmander|  0|Secondary_type|  Fire|
|  Charmander|  1|  Primary_type|  Fire|
|    Squirtle|  0|Secondary_type| Water|
|    Squirtle|  1|  Primary_type| Water|
|    Caterpie|  0|Secondary_type|   Bug|
|    Caterpie|  1|  Primary_type|   Bug|

PySpark SQL posexplode_outer() Function

Same principle as the posexplode() function, but with the exception that if the array or map is null or empty, the posexplode_outer function returns null, null for the pos and col columns. Similarly for the map, it returns rows with null values.

# Get the position of each element - posexplode_outer() function

from pyspark.sql.functions import posexplode_outer

df.select(df.pokemon_name,posexplode_outer(df.japanese_french_name)).show()
df.select(df.pokemon_name,posexplode_outer(df.types)).show()

Conclusion

In this article, we will use the explode function to convert array columns or a map column using the functions available in Pyspark SQL. I hope this will be useful for your learning or your projects!


If you want to learn more about spark, you can read one of those books :

As an Amazon Partner, I make a profit on qualifying purchases).

Back to the python section

Published
Categorized as Python

By ayed_amira

I'm a data scientist. Passionate about new technologies and programming I created this website mainly for people who want to learn more about data science and programming :)

Leave a comment

Your email address will not be published.