PySpark Read CSV file into Spark Dataframe

PySpark Read CSV file : In this tutorial, I will explain how to create a spark dataframe using a CSV file.
Introduction
CSV is a widely used data format for processing data. The read.csv() function present in PySpark allows you to read a CSV file and save this file in a Pyspark dataframe. We will therefore see in this tutorial how to read one or more CSV files from a local directory and use the different transformations possible with the options of the function.
If you need to install spark in your machine, you can consult this beginning of the tutorial :
Pyspark read csv Syntax
To illustrate the different examples, we will go to this file which contains the list of the different pokemons. You can download it via this link :
This file contains 13 columns which are as follows :
- Index
- Name
- Type1
- Type2
- Total
- HP
- Attack
- Defense
- Specia
- Atk
- Specia
- Def
- Speed
- Generation
- Legendary
The basic syntax for using the read.csv function is as follows:
# The path or file is stored
spark.read.csv("path")
To read the CSV file as an example, proceed as follows:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType,StructField, StringType, IntegerType , BooleanType
spark = SparkSession.builder.appName('pyspark - example read csv').getOrCreate()
sc = spark.sparkContext
df = spark.read.csv("amiradata/pokedex.csv")
df.printSchema()
df.show(5,False)
# Result of the printSchema()
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
# Result of show() function
+-----+---------------------+-----+------+-----+---+------+-------+----------+----------+-----+----------+---------+
|_c0 |_c1 |_c2 |_c3 |_c4 |_c5|_c6 |_c7 |_c8 |_c9 |_c10 |_c11 |_c12 |
+-----+---------------------+-----+------+-----+---+------+-------+----------+----------+-----+----------+---------+
|Index|Name |Type1|Type2 |Total|HP |Attack|Defense|SpecialAtk|SpecialDef|Speed|Generation|Legendary|
|1 |Bulbasaur |Grass|Poison|318 |45 |49 |49 |65 |65 |45 |1 |False |
|2 |Ivysaur |Grass|Poison|405 |60 |62 |63 |80 |80 |60 |1 |False |
|3 |Venusaur |Grass|Poison|525 |80 |82 |83 |100 |100 |80 |1 |False |
|3 |VenusaurMega Venusaur|Grass|Poison|625 |80 |100 |123 |122 |120 |80 |1 |False |
+-----+---------------------+-----+------+-----+---+------+-------+----------+----------+-----+----------+---------+
only showing top 5 rows
By default, when only the path of the file is specified, the header is equal to False whereas the file contains a header on the first line. All columns are also considered as strings. To solve these problems the read.csv() function takes several optional arguments, the most common of which are :
- header : uses the first line as names of columns. By default, the value is False
- sep : sets a separator for each field and value. By default, the value is comma
- schema : an optional
pyspark.sql.types.StructType
for the input schema or a DDL-formatted string - path : string, or list of strings, for input path(s), or RDD of Strings storing CSV rows.
You will find the complete list of parameters on the official spark website.
Read CSV file using header record
If your file already contains a header on the first line, you must specify it explicitly by declaring the Header parameter to True.
# Specifies the header to True
df = spark.read.csv("amiradata/pokedex.csv",header=True)
df.printSchema()
root
|-- Index: string (nullable = true)
|-- Name: string (nullable = true)
|-- Type1: string (nullable = true)
|-- Type2: string (nullable = true)
|-- Total: string (nullable = true)
|-- HP: string (nullable = true)
|-- Attack: string (nullable = true)
|-- Defense: string (nullable = true)
|-- SpecialAtk: string (nullable = true)
|-- SpecialDef: string (nullable = true)
|-- Speed: string (nullable = true)
|-- Generation: string (nullable = true)
|-- Legendary: string (nullable = true)
With the printSchema(), we can see that the Header has been taken into consideration.
Read CSV file using a user custom schema
As we have seen, by default, all columns were considered as strings. If we want to change this, we can use the structures. Once our structure is created we can specify it in the schema parameter of the read.csv() function.
# Schematic of the table
schema = StructType() \
.add("Index",IntegerType(),True) \
.add("Name",StringType(),True) \
.add("Type1",StringType(),True) \
.add("Type2",StringType(),True) \
.add("Total",IntegerType(),True) \
.add("HP",IntegerType(),True) \
.add("Attack",IntegerType(),True) \
.add("Defense",IntegerType(),True) \
.add("SpecialAtk",IntegerType(),True) \
.add("SpecialDef",IntegerType(),True) \
.add("Speed",IntegerType(),True) \
.add("Generation",IntegerType(),True) \
.add("Legendary",BooleanType(),True)
df = spark.read.csv("amiradata/pokedex.csv",header=True,schema=schema)
df.printSchema()
root
|-- Index: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Type1: string (nullable = true)
|-- Type2: string (nullable = true)
|-- Total: integer (nullable = true)
|-- HP: integer (nullable = true)
|-- Attack: integer (nullable = true)
|-- Defense: integer (nullable = true)
|-- SpecialAtk: integer (nullable = true)
|-- SpecialDef: integer (nullable = true)
|-- Speed: integer (nullable = true)
|-- Generation: integer (nullable = true)
|-- Legendary: boolean (nullable = true)
As you can see, the schema has been changed and contains the types we specified in our Structure.
Read multiple CSV files
With this function it is possible to read several files directly (either by listing all the paths of each file or by specifying the folder where your different files are located):
# reads the 3 files specified in the PATH parameter
df = spark.read.csv("amiradata/pokedex.csv,amiradata/pokedex2.csv,amiradata/pokedex3.csv")
# Reads the files in the folder
df = spark.read.csv("amiradata/")
Conclusion
In this tutorial we have learned how to read a CSV file using the read.csv() function in Spark. This function is very useful and we have only seen a tiny part of the options it offers us.
If you want to learn more about PySpark, you can read this book : (As an Amazon Partner, I make a profit on qualifying purchases) :
In our next article, we will see how to create a CSV file from within Pyspark Dataframe. Pay attention! 🙂
Comments
Leave a comment