PySpark Explode Nested Array, Array or Map to rows

PySpark Explode : In this tutorial, we will learn how to explode and flatten columns of a dataframe pyspark using the different functions available in Pyspark.
Introduction
When working on PySpark, we often use semi-structured data such as JSON or XML files. These file types can contain arrays or map elements. They can therefore be difficult to process in a single row or column. The explode() function present in Pyspark allows this processing and allows to better understand this type of data. This function returns a new row for each element of the table or map. It also allows, if desired, to create a new row for each key-value pair of a structure map.
This tutorial will explain how to use the following Pyspark functions:
To start we will need to create a dataframe. In our example, our dataframe will be composed of 4 columns:
pokemon_name: Contains the name of the pokemon
evolves: This column contains the list of the evolutions of each pokémon, it is presented in the form of a nested array.
japanese_english_name: This is a array that contains the Japanese and French translation of pokemon names.
types : This column contains the primary and secondary type for each pokemon. Its type is a map.
Here is the code to create the dataframe mentioned above:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
spark = SparkSession.builder.appName('pyspark - example explode()').getOrCreate()
sc = spark.sparkContext
database_poke = [
("Bulbasaur",[["Ivysaur","2"],["Venusaur","3"]],['Fushigidane','Bulbizarre'],{'Primary_type':'Grass','Secondary_type':'Poison'}),
("Charmander",[["Charmeleon","5"],["Charizard","6"]],['Hitokage','Salameche'],{'Primary_type':'Fire','Secondary_type':'Fire'}),
("Squirtle",[["Wartortle","8"],["Blastoise","9"]],['Zenigame','Carapuce'],{'Primary_type':'Water','Secondary_type':'Water'}),
("Caterpie",[["Metapod","11"],["Butterfree","12"]],['Kyatapī','Chenipan'],{'Primary_type':'Bug','Secondary_type':'Bug'})
]
df = spark.createDataFrame(data=database_poke, schema = ['pokemon_name','evolves','japanese_french_name','types'])
df.printSchema()
df.show(4,truncate=False)
With the function printschema() and show(), we see that the dataframe contains the following lines:
root
|-- pokemon_name: string (nullable = true)
|-- evolves: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- japanese_french_name: array (nullable = true)
| |-- element: string (containsNull = true)
|-- types: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+------------+---------------------------------+-------------------------+-------------------------------------------------+
|pokemon_name|evolves |japanese_french_name |types |
+------------+---------------------------------+-------------------------+-------------------------------------------------+
|Bulbasaur |[[Ivysaur, 2], [Venusaur, 3]] |[Fushigidane, Bulbizarre]|[Secondary_type -> Poison, Primary_type -> Grass]|
|Charmander |[[Charmeleon, 5], [Charizard, 6]]|[Hitokage, Salameche] |[Secondary_type -> Fire, Primary_type -> Fire] |
|Squirtle |[[Wartortle, 8], [Blastoise, 9]] |[Zenigame, Carapuce] |[Secondary_type -> Water, Primary_type -> Water] |
|Caterpie |[[Metapod, 11], [Butterfree, 12]]|[Kyatapī, Chenipan] |[Secondary_type -> Bug, Primary_type -> Bug] |
+------------+---------------------------------+-------------------------+-------------------------------------------------+
PySpark Explode Nested Array Column to Rows
In this first part, we are going to explode a column containing a nested array.
The explode() function is available in the pyspark.sql.functions module, so you have to import this module :
# Import module
from pyspark.sql.functions import explode
Now we can use this function to explode our “evolves” column:
# Explode() function
from pyspark.sql.functions import explode
df.select(df.pokemon_name,explode(df.evolves)).show(truncate=False)
This has the effect of producing the following result:
+------------+----------------+
|pokemon_name|col |
+------------+----------------+
|Bulbasaur |[Ivysaur, 2] |
|Bulbasaur |[Venusaur, 3] |
|Charmander |[Charmeleon, 5] |
|Charmander |[Charizard, 6] |
|Squirtle |[Wartortle, 8] |
|Squirtle |[Blastoise, 9] |
|Caterpie |[Metapod, 11] |
|Caterpie |[Butterfree, 12]|
+------------+----------------+
The function create a new column called “col” and allowed us to create new rows for each element of our nested array.
PySpark Explode Array or Map Column to Rows
Previously we have shown that it is possible to explode a nested array but also possible to explode a column containing a array or a map over several rows. By default, null values are ignored and will not create new rows.
Explode Array Column
When an array is passed as a parameter to the explode() function, the explode() function will create a new column called “col” by default which will contain all the elements of the array.
# Explode Array Column
from pyspark.sql.functions import explode
df.select(df.pokemon_name,explode(df.japanese_french_name)).show(truncate=False)
+------------+----------------+
|pokemon_name|col |
+------------+----------------+
|Bulbasaur |[Ivysaur, 2] |
|Bulbasaur |[Venusaur, 3] |
|Charmander |[Charmeleon, 5] |
|Charmander |[Charizard, 6] |
|Squirtle |[Wartortle, 8] |
|Squirtle |[Blastoise, 9] |
|Caterpie |[Metapod, 11] |
|Caterpie |[Butterfree, 12]|
+------------+----------------+
Explode Map Column
When a map is passed as a parameter of the explode() function, it will create two new columns (one for the key and one for value).
# Explode Map Column
from pyspark.sql.functions import explode
df.select(df.pokemon_name,explode(df.types)).show(truncate=False)
+------------+--------------+------+
|pokemon_name|key |value |
+------------+--------------+------+
|Bulbasaur |Secondary_type|Poison|
|Bulbasaur |Primary_type |Grass |
|Charmander |Secondary_type|Fire |
|Charmander |Primary_type |Fire |
|Squirtle |Secondary_type|Water |
|Squirtle |Primary_type |Water |
|Caterpie |Secondary_type|Bug |
|Caterpie |Primary_type |Bug |
+------------+--------------+------+
PySpark Explode : explode_outer(), posexplode(), posexplode_outer()
PySpark SQL explode_outer() Function
This function is used to create a row for each element of the array or map. It returns null if the array or map is null or empty.
from pyspark.sql.functions import explode_outer
df.select(df.pokemon_name,explode_outer(df.types)).show()
+------------+--------------+------+
|pokemon_name| key| value|
+------------+--------------+------+
| Bulbasaur|Secondary_type|Poison|
| Bulbasaur| Primary_type| Grass|
| Charmander|Secondary_type| Fire|
| Charmander| Primary_type| Fire|
| Squirtle|Secondary_type| Water|
| Squirtle| Primary_type| Water|
| Caterpie|Secondary_type| Bug|
| Caterpie| Primary_type| Bug|
+------------+--------------+------+
It works the same way for a array.
PySpark SQL posexplode() Function
We have seen that the explode() function returned one column for arrays and two columns for maps. With the function posexplode(), it will add a new column “pos” which contains the position of the element in the array or map.
# Get the position of each element
from pyspark.sql.functions import posexplode
df.select(df.pokemon_name,posexplode(df.japanese_french_name)).show()
df.select(df.pokemon_name,posexplode(df.types)).show()
+------------+---+-----------+
|pokemon_name|pos| col|
+------------+---+-----------+
| Bulbasaur| 0|Fushigidane|
| Bulbasaur| 1| Bulbizarre|
| Charmander| 0| Hitokage|
| Charmander| 1| Salameche|
| Squirtle| 0| Zenigame|
| Squirtle| 1| Carapuce|
| Caterpie| 0| Kyatapī|
| Caterpie| 1| Chenipan|
+------------+---+-----------+
+------------+---+--------------+------+
|pokemon_name|pos| key| value|
+------------+---+--------------+------+
| Bulbasaur| 0|Secondary_type|Poison|
| Bulbasaur| 1| Primary_type| Grass|
| Charmander| 0|Secondary_type| Fire|
| Charmander| 1| Primary_type| Fire|
| Squirtle| 0|Secondary_type| Water|
| Squirtle| 1| Primary_type| Water|
| Caterpie| 0|Secondary_type| Bug|
| Caterpie| 1| Primary_type| Bug|
PySpark SQL posexplode_outer() Function
Same principle as the posexplode() function, but with the exception that if the array or map is null or empty, the posexplode_outer function returns null, null for the pos and col columns. Similarly for the map, it returns rows with null values.
# Get the position of each element - posexplode_outer() function
from pyspark.sql.functions import posexplode_outer
df.select(df.pokemon_name,posexplode_outer(df.japanese_french_name)).show()
df.select(df.pokemon_name,posexplode_outer(df.types)).show()
Conclusion
In this article, we will use the explode function to convert array columns or a map column using the functions available in Pyspark SQL. I hope this will be useful for your learning or your projects!
If you want to learn more about spark, you can read one of those books :
As an Amazon Partner, I make a profit on qualifying purchases).
Comments
Leave a comment