on
Master Data Wrangling with PySpark: A Comprehensive Cheat Sheet
PySpark offers a powerful and scalable solution for data wrangling tasks, especially when dealing with large datasets. Compared to traditional Python libraries like pandas, PySpark excels in:
-
Distributed Processing: PySpark leverages the power of clusters to process data in parallel, significantly improving performance for big data workloads. Fault Tolerance: PySpark can automatically handle machine failures and recompute lost tasks, ensuring data processing reliability.
-
Scalability: PySpark seamlessly scales up or down based on your data volume and computational needs.
Unleash the power of Apache Spark for efficient data manipulation! This PySpark cheatsheet equips you with the essential techniques to conquer data wrangling challenges. From filtering and imputing missing values to exploring and transforming your data, this guide provides a concise roadmap to being your pyspark journey.
Table of Contents
- 1. Creating a Spark Session Object:
- 2. Reading data from files:
- 3. Converting a pandas DataFrame to a PySpark DataFrame:
- 4. Exploring data stored in PySpark DataFrames:
- 5. Filtering rows and columns from a DataFrame:
- 6. Renaming Columns:
- 7. Imputing missing values in a DataFrames:
- 8. Removing duplicate rows from a DataFrame:
- 9. Displaying distinct values from a column:
- 10. Creating a column with a constant value in a DataFrame:
- 11. Rounding values in a column of a DataFrame:
- 12. Typecasting columns in a DataFrame:
- 13. Creating a Temporary view from a DataFrame:
- 14. Stopping/Ending a Spark Session:
1. Creating a Spark Session Object:
PySpark API provides a python wrapper around the spark framework. All elements of the spark framework can be accessed using the spark session object.
# import statements:
import pyspark
from pyspark.sql import SparkSession
# creating the spark session object:
spark = (
SparkSession
.builder
.appname("DataWranglingwithPyspark")
.getOrCreate()
)
2. Reading data from files:
# reading data from a '.csv' file:
csv_df = (
spark.read.csv(
"filepath", header=True, inferSchema=True
)
)
# reading data from a '.txt' file:
txt_df = (
spark.read.txt(
"filepath", header=True, inferSchema=True
)
)
# reading data from a '.json' file:
json_df = spark.read.json("filepath")
3. Converting a pandas DataFrame to a PySpark DataFrame:
# let pd_df be the pandas dataframe:
pyspark_df = spark.createDataFrame(pd_df)
4. Exploring data stored in PySpark DataFrames:
4.1. Displaying the DataFrame:
df.show() # prints the dataframe.
4.2. Showing the schema of a DataFrame:
df.printSchema() # prints the schema of the dataframe.
4.3. Displaying the shape of a DataFrame:
print(df.count(), len(df.columns) # prints the (rows, columns) of a dataframe.
4.4. Listing columns of a DataFrame:
df.columns
5. Filtering rows and columns from a DataFrame:
# filtering rows on a condition:
filtered_df = df.filter(df["colname"]>10) # provide the filter condition as an input parameter to the filter method.
# filtering rows based on multiple conditions:
filtered_df = df.filter(
(df["colname_1"]>10) # filter condition 1
& (df["colname_2"] == "India") # filter condition 2
)
# selecting columns from a dataframe:
filtered_df = df.select(["colname_1", "colname_2", "colname_3"])
# dropping columns from a dataframe:
filtered_df = df.drop(["colname_1", "colname_2", "colname_3"])
# dropping rows with missing values:
filtred_df = df.dropna()
6. Renaming Columns:
# renaming columns in a dataframe:
df = df.withColumnRenamed("old_colname", "new_colname")
# renaming multiple columns in a dataframe:
colname_replacements = {
"old_colname_1": "new_colname_1",
"old_colname_2": "new_colname_2"
}
for old_colname, new_colname in colname_replacements.items():
df = df.withColumnRenamed(old_colname, new_colname)
7. Imputing missing values in a DataFrames:
imputation_value = 0
df = df.fillna(imputation_value) # imputes all missing values with 0.
8. Removing duplicate rows from a DataFrame:
# removing all duplicate rows from a dataframe:
df1 = df.dropDuplicates()
# another way of removing all duplicate rows from a dataframe:
df1 = df.distinct()
# removing all rows with duplicates within specific columns of the dataframe:
df2 = df.dropDuplicates(["col1", "col2"])
9. Displaying distinct values from a column:
# displaying distinct values from a column:
df["colname"].distinct()
# displaying count of distinct values from a column:
df["colname"].distinct().count()
10. Creating a column with a constant value in a DataFrame:
from pyspark.sql import lit
# creating column with a constant value in a DataFrame:
constant_value = 0
df = df.withColumn("constant_col", lit(constant_value))
11. Rounding values in a column of a DataFrame:
from pyspark.sql import round
# creating a column by rounding values from an existing numeric column:
rounding_places = 2
rounded_df = df.withColumn("rounded_col", round(df["numeric_col"], rounding_places))
12. Typecasting columns in a DataFrame:
from pyspark.sql import functions as f
# converting the datatype of the column to float:
df = df.withColumn("Col_1_float", f.col("Col_1").cast("float"))
# converting the datatype of the column to boolean:
df = df.withColumn("Col_2_bool", f.col("Col_2").cast("bool"))
# converting the datatype of the column to string:
df = df.withColumn("Col_3_string", f.col("Col_3").cast("string"))
13. Creating a Temporary view from a DataFrame:
# creating the temporaty view:
df.createOrReplaceTempView("table_name")
# using spark sql to query the temporary view:
queried_df = spark.sql("SELECT * FROM table_name")
14. Stopping/Ending a Spark Session:
spark.stop() # ends/destroys the spark session object.
🕵🏽 Thank you for reading this document. If you have any feedback, you can email me at amandeepsinghkhanna@gmail.com.