2024-06-27

Data Processing with PySpark, Delta Lake and AWS EMR

In this post, we'll discuss data processing with PySpark using the delta lake format and deploying it on AWS Elastic MapReduce (EMR)

pyspark-emr-delta

Data processing with PySpark and Delta Lake on AWS EMR

In this post, we'll discuss data processing with PySpark using the delta lake format and deploying it on AWS Elastic MapReduce (EMR).

The code for this blog post can be found here. Please do give us a star or follow if it was useful for you.


Contents

  1. Dataset Description
  2. Pre-requisites
  3. Dependencies
  4. Workflow
    1. Creating the SparkSession
    2. Reading data from the CSV File
    3. Creating aggregated tables
    4. Writing to S3 as Delta tables
    5. Upserting in a Delta table
  5. Deploying to AWS EMR
    1. Packaging the workflow
    2. Uploading to S3
    3. Creating the EMR cluster
    4. Running our PySpark workflow
  6. Conclusion

Dataset Description

The dataset we'll be using in this post is a 128kb CSV file that contains sales data from a shopping mart. It has been sourced from Kaggle Datasets.

Fields that we'll be using are:

  • Branch: Branch code for the mart
  • City: City that the store is located in
  • Product Line: Product category
  • Total: Total sales amount per transaction

There are other fields that can be of import. It is left as a task for the reader to extend the work done in this post to get insights using other fields like Gender, Customer type and Payment method.

Pre-requisites

The rest of this post assumes basic knowledge of writing PySpark jobs, Delta lake tables, and familiarity with the AWS cloud ecosystem, mostly S3 and EMR.

You'll need:

  • A local Spark cluster for development (you can either install it following the instructions here or use docker)
  • An AWS account with access to EMR, EC2 and S3

Important: Deploying and running the PySpark application on EMR will incur some minor charges as it is a paid managed service from AWS.

Dependencies

We'll be using poetry to manage our python environment and dependencies. For this workflow, we'll need the pyspark package for interacting with Spark and the delta-spark package for working with Delta lake tables.

The pyspark package version should match the version of Spark where your workflow / job will run. I have a local Spark version 3.2.1 so i'll install that into my python environment.

1poetry add pyspark==3.2.1 2

For delta-spark, you can check the version compatible with your version of Spark here.

With the local spark cluster for development up and running, and, the dependencies installed, let's get into processing our data and writing it out to S3 as a delta table.

Workflow

Creating the SparkSession

To create our SparkSession, we need to add some configuration to be able to work with Delta lake and to write it out to S3.

We'll be adding the DeltaSparkSessionExtension, setting the spark_catalog to the DeltaCatalog from this extension and configure Spark to use the S3AFileSystem when writing to S3.

1builder = ( 2 SparkSession.builder.appName(config.APP_NAME) 3 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 4 .config( 5 "spark.sql.catalog.spark_catalog", 6 "org.apache.spark.sql.delta.catalog.DeltaCatalog", 7 ) 8 .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 9) 10

delta-spark comes with a neat utility to configure the SparkSession with the delta-core jar that is required for working with the delta format. However, this only works when we're not running it on a cluster.

Since we're writing out to AWS, we'll also add in the org.apache.hadoop:hadoop-aws jar. Again, we'll be using the hadoop version that our Spark version has been compiled against. For Spark 3.2.1, the compatible hadoop version is 3.3.1.

SparkSession creation is controlled using the ENVIRONMENT variable. In the DEVELOPMENT environment, we configure the session with the utility from delta-spark.

1if config.ENVIRONMENT == "DEVELOPMENT": 2 spark = ( 3 configure_spark_with_delta_pip( 4 builder, 5 extra_packages=[ 6 "org.apache.hadoop:hadoop-aws:3.3.1", 7 ], 8 ) 9 .master("local[*]") 10 .getOrCreate() 11 ) 12else: 13 spark = builder.getOrCreate() 14

Reading data from the CSV File

We'll read the data using the spark session:

1df = ( 2 spark.read.format("csv") 3 .option("inferSchema", True) 4 .option("header", True) 5 .load(str(config.DATA_PATH)) 6) 7

We set the option to read in the columns from the header of the CSV file. However, the headers are not in a usable format (snake case). Let's rename the columns so they're easier to work with.

1df = df.toDF(*list(map(lambda x: x.strip().lower().replace(" ", "_"), df.columns))) 2

Creating aggregated tables

Next, we'll create some trivial aggregated tables from this data to get:

  • Total sales amount by city
  • Total sales amount by branch
  • Top product lines based on total products sold by city and branch
  • Top 2 products per city and branch

In a production setting, we'd have a staging area for writing out intermediate tables before finally upserting them into the final table to make the swap as close to atomic as possible.
However, for the sake of brevity, we'll be writing these tables out to the main table on S3 directly.

1# total by city 2total_by_city = ( 3 df.groupBy(f.col("city")) 4 .agg( 5 f.round(f.sum(f.col("total")), 2).alias("total_sales_amount"), 6 f.count_distinct(f.col("product_line")).alias("num_unique_products"), 7 ) 8 .orderBy(f.col("total_sales_amount").desc()) 9) 10 11# total by branch 12total_by_branch = ( 13 df.groupBy( 14 f.concat(f.col("city"), f.lit("-"), f.col("branch")).alias("city_branch") 15 ) 16 .agg( 17 f.round(f.sum(f.col("total")), 2).alias("total_sales_amount"), 18 f.count_distinct(f.col("product_line")).alias("num_unique_products"), 19 ) 20 .orderBy(f.col("total_sales_amount").desc()) 21) 22 23 24# top product lines based on total products sold by city and branch 25top_products = ( 26 df.groupBy( 27 f.concat(f.col("city"), f.lit("-"), f.col("branch")).alias("city_branch"), 28 f.col("product_line"), 29 ) 30 .agg(f.count("*").alias("total_products")) 31 .orderBy(f.col("total_products").desc()) 32) 33 34# Top 2 products only 35top_2_products = ( 36 df.groupBy( 37 f.concat(f.col("city"), f.lit("-"), f.col("branch")).alias("city_branch"), 38 f.col("product_line"), 39 ) 40 .agg(f.count("*").alias("total_products")) 41 .select( 42 f.col("city_branch"), 43 f.col("product_line"), 44 f.col("total_products"), 45 f.dense_rank() 46 .over( 47 Window.partitionBy(f.col("city_branch")).orderBy( 48 f.col("total_products").desc() 49 ) 50 ) 51 .alias("rnk"), 52 ) 53 .where(f.col("rnk") <= 2) 54 .orderBy(f.col("total_products").desc()) 55) 56 57

Writing to S3 as Delta tables

In a production setting, the data would be written in partitions, usually by date.

1( 2 total_by_city 3 .write 4 .format("delta") 5 .mode("overwrite") 6 .save(config.OUT_PATH) 7) 8

Upserting in a Delta table

Let's try to upsert some records into the Delta table.

First, we read it in:

1df_total_city = DeltaTable.forPath(spark, str(config.OUT_PATH)) 2 3# print to stdout 4df_total_city.toDF().show() 5

In order to upsert, we need a dataframe with updated values.

1new_df = spark.createDataFrame( 2 [("Naypyitaw", 120000.00, 10), 3 ("Kansas", 140000.00, 12)], 4 StructType( 5 [ 6 StructField("city", StringType(), True), 7 StructField("total_sales_amount", DoubleType(), True), 8 StructField("num_unique_products", IntegerType(), True), 9 ] 10 ), 11) 12

Here, we create a new row for the a specific city with updated values for the total_sales_amount and num_unique_products columns.

Let's also emulate a transformation using a transformation on this new dataframe. We'll use a trivial UDF(user defined function) here.

1# utils.py (UDF) 2def mul_double(val: int) -> IntegerType: 3 return val * 2 4 5 6mul_double_udf = udf(lambda x: mul_double(x), IntegerType()) 7

Apply it to the num_unique_products column.

1new_df = new_df.select( 2 f.col("city"), 3 f.col("total_sales_amount"), 4 mul_double_udf(f.col("num_unique_products")).alias("num_unique_products"), 5) 6

To upsert this into the Delta table, we'll update the columns when the cities match, otherwise we insert it as is.

1 2( 3 df_total_city.alias("old_data") 4 .merge(source=new_df.alias("new_data"), condition="old_data.city = new_data.city") 5 .whenMatchedUpdate( 6 condition="new_data.total_sales_amount > 0", 7 set={ 8 "total_sales_amount": f.col("new_data.total_sales_amount"), 9 "num_unique_products": f.col("new_data.num_unique_products"), 10 }, 11 ) 12 .whenNotMatchedInsert( 13 values={ 14 "city": f.col("new_data.city"), 15 "total_sales_amount": f.col("new_data.total_sales_amount"), 16 "num_unique_products": f.col("new_data.num_unique_products"), 17 } 18 ) 19 .execute() # writes back to the delta table as well 20) 21 22

The execute() method writes the result back to the delta table. If we now inspect our Delta table, we should see the updated values.

Deploying to AWS EMR

Packaging the workflow

Our PySpark job is ready to be deployed. But there are a few steps before we can deploy this to an AWS EMR cluster.

We have to:

  1. Package our dependencies into a zip archive
  2. Upload the zip archive and the main python entrypoint file (main.py in our case)

Let's look at the command to package our workflow:

1python -m pip install poetry && \ 2 poetry export -f requirements.txt --without-hashes -o requirements.txt && \ 3 rm -rf dist/ && mkdir -p dist/ && \ 4 poetry run pip install . -r requirements.txt -t dist/tmp && \ 5 cd dist/tmp && \ 6 find . -name "*.pyc" -delete && \ 7 zip -r ../packages.zip . -x 'numpy*/*' -x 'pandas*/*' -x 'pyarrow*/*' 8
  1. Install poetry
  2. Export dependencies to a requirements.txt file
  3. Cleanup dist/ directory
  4. Install the package and its dependencies into a temp directory
  5. Removing unnecessary compiled files
  6. Create a zip archive of the installed packages, excluding packages that might conflict with existing dependencies in Spark (numpy, pandas, pyarrow)

Uploading to S3

Let's upload this dist/packages.zip archive and the main.py entrypoint to S3 where it can be accessed by the AWS EMR cluster.

This can also be run as part of a CI pipeline.

1# add checks to see if the zip and file exist 2aws s3 cp ./dist/packages.zip s3://bucket/workflow/ 3aws s3 cp pypsark_emr_delta/main.py s3://bucket/workflow/ 4

Both should now be accessible on S3 under bucket/workflow/.

Creating the EMR cluster

We'll be running our workflow/application on EMR using the UI. In a production setting, this should be automated using IaaC like Terraform or by using the boto3 library.

  1. Create Service Roles for EMR and Instance profiles for the EC2 Instances in EMR.
    Roles can be created from the IAM console. We need AmazonS3FullAccess and AmazonEMR-ServiceRole-Policy. These roles allow EMR to access other AWS resources like EC2 Instances and S3 for accessing data and storing logs.

  2. Next, we'll create an EMR cluster.

    • Navigate to the EMR console
    • Click on Create Cluster
    • Enter name for the cluster
    • It is important to select the right release of EMR. Based on the Spark version you've developed against during development, choose the EMR release that matches it. For example: the example in this post has been built with Spark version 3.2.1 and we've packaged PySpark version 3.2.1 along with the dependencies for the workflow
    • We only need the Spark application on the EMR cluster
    • We can also enable the AWS Glue data catalog as the Spark catalog. This is useful in scenarios where the ingestion and query engines are different. If we write a delta table to S3 using Spark and with the Glue data catalog as the spark catalog, we can query it from any engine that supports reading Delta tables, like Flink.
  3. Instance Configuration

    • Instance Type: For this post, we used the m5.xlarge instance with 4 vCPUs and 16 GiB memory. Bigger instances incur more cost so the smallest one should suffice for running our workflow.

    • Number of Instance: We'll only use the Primary node. Most production clusters have Core and Task nodes as well for running Spark executors. We will use a single Primary node.

  4. Key Pair
    You can create a key pair for SSH access to the master node (optional)

  5. Network Configuration
    Here we select the VPC and the subnet where our EMR cluster will spawn. Be mindful of whether you're selecting a public or a private subnet. Private subnets are safer but you'd need a bastion on the public subnet to be able to SSH into the master node. Since we're running a simple non-production workflow, we can create our cluster on the public subnet.

  6. Logging
    We'll enable logging so we can inspect the logs from the controller, nodes and the steps in AWS S3.

Finally, click Create Cluster. EMR clusters usually take 2-10 minutes to become active. Once the cluster's active, it enters a Waiting state, ready for the user to submit jobs.

In most production settings, the cluster is mostly ephemeral i.e. the cluster is spun up before executing a job and spun down directly after to mitigate costs.

Running our PySpark workflow

In the EMR console, go to the Clusters section and select the cluster that we just created.

  1. Click on the Steps tab and select Add Step
  2. Next, we'll give our step a name
  3. Enter the S3 path to the main.py script that we'd uploaded before
  4. Since we want to add our dependencies that we'd packaged as well, we'll add the following arguments so that Spark picks it up:
1--packages org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-core_2.12:2.0.1 --py-files s3://bucket/workflow/packages.zip s3://bucket/workflow/main.py 2
  1. Set the action on failure to CANCEL_AND_WAIT in case of exceptions
  2. Click Add to add the step to the cluster

The step will be assigned an ID (Step ID) and will run on the EMR cluster. Logs are available in S3. Step specific logs follow the naming convention: <emr_logs_bucket>/<cluster_id>/steps/<step_id>/.

If everything goes well, you should see the output delta table in the S3 bucket and the table registered in the Glue data catalog along with all the relevant metadata.

Conclusion

In this post, we explored using PySpark with Delta lake for managing data processing and analytics. PySpark is a robust python API for Apache Spark and enable efficient and scalable data processing, making it an ideal choice for big data workloads on the data lake or lakehouse. Delta lake enchances Spark's capabilities by bringing ACID transactions, metadata and unified batch and stream processing to the data lake.

Deploying on AWS EMR simplifies the provisioning and management of clusters.

We at Unskew data have been using PySpark and AWS EMR for our big data workloads and as a backbone of data platforms that we design for our clients. We've also been adding data lake formats like Delta Lake and Apache Hudi to our data platform offerings to significantly reduce costs when dealing with petabyte scale datalakes.
If you or your company need help with big data workloads, ETL design and implementation or with data platform and infrastructure, please do reach out and we'll be happy to discuss how our team can solve your challenges.

If you have any suggestions, comments or corrections, please drop a comment down below.