After the schema check, people are interested to query data from the source. Afterall, data loading is the very first step of the ETL process. They also want to filter data based on the criteria. In this task, you learn about data query and filter syntax for PySpark.
-
On the AWS Glue console, open jupyter notebook if not already open. On jupyter notebook, click on New dropdown menu and select Sparkmagic (PySpark) option. It will open notebook file in a new window. Rename the notebook to query.
-
Copy and paste the following PySpark snippet (in the black box) to the notebook cell and click Run. It will create Glue Context. It takes some time to start SparkSession and get Glue Context. Wait for the confirmation message saying SparkSession available as ‘spark’..
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
- Copy the following PySpark snippet in the notebook cell and click Run. Wait for the execution to finish. It will load dynamicframe for the data catalog table sales in the database dojodatabase.
salesDF = glueContext.create_dynamic_frame.from_catalog(
database="dojodatabase",
table_name="sales")
- Run the following PySpark snippet in the notebook cell to show top 20 rows of the data catalog table sales.
salesDF.toDF().show()
- You can pass parameters in the show function to see specified number of rows. Run the following PySpark snippet in the notebook cell to show top 2 rows of the data catalog table sales.
salesDF.toDF().show(2)
- You can use head method to query top row in a different format. You can pass number of rows as parameter to see specified number of rows.
salesDF.toDF().head()
- You can also make selection on the columns in place of displaying all the columns. You can use SelectFields method with the Dynamicframe to query rows with selected columns. Run the following PySpark code snippet to select ordernumber and sales columns from salesDF Dynamicframe and populate into a Dynamicframe ordersDF.
ordersDF = SelectFields.apply(salesDF,["ordernumber","sales"])
ordersDF.toDF().show()
- You can also perform row level filtering on the data using Filter method. Run the following PySpark snippet in the notebook cell to filter rows for sales column value greater than 12000 and populate into a Dynamicframe highSalesDF.
highSalesDF = Filter.apply(salesDF, f = lambda x: x["sales"] > 12000)
highSalesDF.toDF().show()
-
These were some of the PySpark syntaxes to perform query and filter data. Click on the Close and Halt option under Files menu to close the notebook. Confirm if it asks to Leave Page.
-
In the next task, you learn about updating data.