Building AWS Glue Job using PySpark - Part:2(of 2)

   Go back to the Task List

  « 3: Query the Data from the Source    5: Aggregation Functions »

4: Update the Data

Data update is one of the most fundamental part of writing ETL job as the data is most of the time transformed before being loaded to the destination. In this task, you learn how to update data. The data is updated for the purpose of formatting, new data creation, correction, aggregation, cleansing and encoding purposes.

  1. On the AWS Glue console, open jupyter notebook if not already open. On jupyter notebook, click on New dropdown menu and select Sparkmagic (PySpark) option. It will open notebook file in a new window. Rename the notebook to update.

  2. Copy and paste the following PySpark snippet (in the black box) to the notebook cell and click Run. It will create Glue Context. It takes some time to start SparkSession and get Glue Context. Wait for the confirmation message saying SparkSession available as ‘spark’..

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
  1. Run the following PySpark code snippet which loads data in the Dynamicframe from the customers table in the dojodatabase database.
customersDF = glueContext.create_dynamic_frame.from_catalog(
             database="dojodatabase",
             table_name="customers")
  1. Many times, one need to do simple concatenation of two fields. For example - create a new column contactfullname by joining two existing columns contactfirstname and contactlastname. You can use python code based logic to perform such concatenation. You need to create a python function with the logic and then use apply method. Run the following code to create python function to create a new column contactfullname by joining two existing columns contactfirstname and contactlastname.
def addcolumns(rec):
    rec["contactfullname"] = {}
    rec["contactfullname"] = rec["contactfirstname"] + ' ' + rec["contactlastname"]
    return rec
  1. The Dynamicframe provides Apply method to use the python function for the transformation. Run the following code which will apply the python function to create a new column contactfullname by joining two existing columns contactfirstname and contactlastname.
customersDF =  Map.apply(customersDF, f = addcolumns)
customersDF.toDF().show();
  1. You can use DropFields method to drop specific fields from the Dynamicframe. Now that we have contactfullname column; we don’t need contactfirstname and contactlastname columns. Run the following PySpark code snippet to drop contactfirstname and contactlastname columns from the frame.
customersDF = DropFields.apply(customersDF,["contactfirstname","contactlastname"])
customersDF.toDF().show()
  1. When using python function to transform / update the code, you can build complex logic and can use full capability of the python programming. Let’s work on another example. Run the following code to create python function to extract domain name out of the email address.
def parsedomain(rec):
    rec["domain"] = {}
    indx = rec["email"].index('@')
    rec["domain"] = rec["email"][indx + 1:]
    return rec
  1. Run the following code which will apply the python function to create a new column domain from the email column.
customersDF =  Map.apply(customersDF, f = parsedomain)
customersDF.toDF().show();
  1. Sometimes, you might want to rename fields. Use rename_field method of the Dynamicframe to change name of the column. Run the following code to rename territory column into region.
customersDF = customersDF.rename_field("territory","region")
customersDF.toDF().show();
  1. These are few methods to manipulation the columns and data. In the next task, you will learn some aggregation and statistical methods. Click on the Close and Halt option under Files menu to close the notebook. Confirm if it asks to Leave Page.