You now run PySpark code to process S3 based data in the EMR Cluster.
-
In the EMR Management console, on the dojocluster notebook page, click on the Open in Jupyter button.
-
It will open Notebook environment in a new browser tab or window. In Jypyter environment, click on PySpark option under the New menu.
-
It will open Jypyter notebook IDE. It the cell, copy-paste the following code and run it. The code imports libraries for the PySpark.
import sys from datetime import datetime from pyspark.sql import SparkSession from pyspark.sql.functions import *
`
-
You copy-paste and run the following code to get the spark session.
spark = SparkSession\ .builder\ .appName("SparkETL")\ .getOrCreate()
`
-
You copy-paste and run the following code to read customers.csv data from the s3 bucket and populate into customerdf dataframe. If you created bucket with a different name, then use that bucket name. When reading the data, you are considering to infer the schema and also treating the first row as the header.
customerdf = spark.read.option("inferSchema", "true").option("header", "true").csv("s3://dojo-data/input/customers.csv")
`
-
You copy-paste and run the following code to check the schema of the customerdf dataframe.
customerdf.printSchema()
`
-
You will perform a small transformation where you select only CUSTOMERNAME and EMAIL fields from the dataframe. Copy-paste and run the following code to apply this transformation and check schema of the transformed dataframe.
customerdf = customerdf.select("CUSTOMERNAME","EMAIL") customerdf.printSchema()
`
-
Finally, you want to write the transformed dataframe back to the S3 bucket in the output folder. Copy-paste and run the following code to write the transformed data to the S3 bucket in parquet format. If you created bucket with a different name, then use that bucket name.
customerdf.write.format("parquet").mode("overwrite").save("s3://dojo-data/output/")
`
-
The code has written the output in the S3 bucket. You can verify it by navigating to the output location in the S3 bucket.
-
This was an example to see how you can run PySpark code in Jypyter Notebook to perform data transformation. The notebook is mostly used for development purpose. In the next step, you run the same code using EMR Task. EMR Task is a method used in the production.