Building AWS Glue Job using PySpark - Part:2(of 2)

   Go back to the Task List

  « 5: Aggregation Functions    7: Write / Load Data at the Destination »

6: Merge & Split Data Sets

When creating ETL job, generally you work with more than one data sets and transformation with them. The transformation could be like merging the two data sets together or split one data set into two or more data sets. In this task, you will learn to merge and split data sets.

  1. On the AWS Glue console, open jupyter notebook if not already open. On jupyter notebook, click on New dropdown menu and select Sparkmagic (PySpark) option. It will open notebook file in a new window. Rename the notebook to multidataset.

  2. Copy and paste the following PySpark snippet (in the black box) to the notebook cell and click Run. It will create Glue Context. It takes some time to start SparkSession and get Glue Context. Wait for the confirmation message saying SparkSession available as ‘spark’..

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
  1. Run the following PySpark code snippet which loads data in the Dynamicframe from the sales and customers tables in the dojodatabase database.
salesDF = glueContext.create_dynamic_frame.from_catalog(
             database="dojodatabase",
             table_name="sales")
customerDF = glueContext.create_dynamic_frame.from_catalog(
             database="dojodatabase",
             table_name="customers")
  1. Run the following PySpark code snippet one by one to check schema of the two Dynamicframes. You will find that the frames have customerid as common key or field. You can join the salesDF and customerDF Dynamicframes based on the field customerid.
salesDF.printSchema()
customerDF.printSchema()
  1. Run the following PySpark code snippet to join the salesDF and customerDF Dynamicframes based on the field customerid. Use printSchema method to check the schema of the merged frame.
customersalesDF=Join.apply(salesDF, customerDF, 'customerid', 'customerid')
customersalesDF.printSchema()
  1. You can see two columns customerid and .customerid due to join on the key. You can use drop_fields method to remove .customerid field.
customersalesDF = customersalesDF.drop_fields(['`.customerid`'])
customersalesDF.printSchema()
  1. You can split a Dynamicframe vertically based on the columns. Use SplitFields method which splits a Dynamicframe into a collection of Dynamicframes. Run the following PySpark code snippet to split salesDF frame into two frames productsDF and restDF. The productsDF has three columns productline,dealsize and status. The restDF will have the remaining of the columns. You can use the keys method to check creation of the Dynamicframe in the Dynamicframe collection colwiseCollection.
colwiseCollection = SplitFields.apply(salesDF,["productline","dealsize","status"],"productsDF","restDF")
colwiseCollection.keys()
  1. Run the following PySpark code snippet one by one to check schema of the two Dynamicframes productsDF and restDF.
colwiseCollection.select("productsDF").printSchema()
colwiseCollection.select("restDF").printSchema()
  1. Similar to the column wise split, you can split a Dynamicframe horizontally based on the row. Use SplitRows method which splits a Dynamicframe into a collection of Dynamicframes based on the condition specified for the rows. Run the following PySpark code snippet to split salesDF frame into two frames sales5000plus and sales5000minus. Both frames have the same columns but one has rows with sales values above 5000.00 and other has rows with sales value lower than 5000.00. You can use the keys method to check creation of the Dynamicframe in the Dynamicframe collection rowwiseCollection.
rowwiseCollection = SplitRows.apply(salesDF,{"sales": {">": 5000.00}},"sales5000plus","sales5000minus")
rowwiseCollection.keys()
  1. Run the following PySpark code snippet one by one to check the top 20 values of the sales column in the two Dynamicframes sales5000plus and sales5000minus.
rowwiseCollection.select("sales5000minus").toDF().select('sales').show()
rowwiseCollection.select("sales5000plus").toDF().select('sales').show()
  1. You can use union method on the Dataframe to merge two similar Dataframes row wise. Run the following PySpark code snippet to merge sales5000minus and sales5000plus Dynamicframes created earlier into a single Dataframe allsalesDF.
sales5000minus = rowwiseCollection.select("sales5000minus").toDF()
sales5000plus = rowwiseCollection.select("sales5000plus").toDF()
allsalesDF = sales5000plus.union(sales5000minus)
allsalesDF.show()
  1. You learnt about manipulating data in and across Dataframes in the workshop so far. In the next task, you learn how to write or load data at the destination. Click on the Close and Halt option under Files menu to close the notebook. Confirm if it asks to Leave Page.