How do I run SQL commands on an Amazon Redshift table before or after writing data in an AWS Glue job?

5 minute read
0

I have an AWS Glue job that loads data into an Amazon Redshift table. I want to run SQL commands on Amazon Redshift before or after the AWS Glue job completes.

Resolution

Pass one of the following parameters in the AWS Glue DynamicFrameWriter class:

  • aws_iam_role: Provides authorization to access data in another AWS resource. Use this parameter with the fully specified ARN of the AWS Identity and Access Management (IAM) role that's attached to the Amazon Redshift cluster. For example, use arn:aws:iam::123456789012:role/redshift_iam_role. For more information, see Authorization parameters.
  • preactions: A semicolon-delimited list of SQL commands that are run before the COPY command. If the commands fail, then Amazon Redshift throws an exception.
    Note: Be sure that the preaction parameter doesn't contain newline characters.
  • postactions: A semicolon-delimited list of SQL commands that are run after a successful COPY command. If the commands fail, then Amazon Redshift throws an exception.
    Note: Be sure that the postaction parameter doesn't contain newline characters.
  • extracopyoptions: A list of additional options to append to the Amazon Redshift COPY command when loading data. For example, you might use TRUNCATECOLUMNS or MAXERROR.

Example scenarios

Truncate an Amazon Redshift table before inserting records in AWS Glue

Use the preactions parameter.

See the following Python example:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

See the following Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

In these examples, be sure to replace the following values:

  • test_red: the catalog connection to use
  • schema.target_table: the Amazon Redshift database's schema and the Amazon Redshift table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

Use an IAM role in the connection options

Because credentials expire after 1 hour, use an IAM role in the connection options to stop your long running connections from failing.

See the following Python example:

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

See the following Scala example:

val connectionOptions = JsonOptions(Map(
      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

Merge an Amazon Redshift table in AWS Glue (upsert) Create a merge query after loading the data into a staging table.

Note: For your merge query to work, target_table must already exist in your Amazon Redshift database.

See the following Python example:

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

See the following Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

In these examples, be sure to replace the following values:

  • schema.target_table: the Amazon Redshift database's schema and the Amazon Redshift table
  • test_red: the catalog connection to use
  • schema.stage_table: the Amazon Redshift database's schema and the Amazon Redshift staging table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

For more information, see Use a staging table to perform a merge (upsert).

Ignore rows that aren't valid

Use the extracopyoptions parameter to specify a higher MAXERROR value.

See the following Python example:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

See the following Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

In these examples, be sure to replace the following values:

  • schema.target_table: the Amazon Redshift database's schema and the Amazon Redshift table
  • schema.stage_table: the Amazon Redshift database's schema and the Amazon Redshift staging table
  • test: the catalog connection to use
  • testalblog2: the Amazon Redshift table to load data into
  • reddb: the Amazon Redshift database
  • emp1: the Amazon Redshift table to delete the data from, after the data is loaded into testalblog2
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

Additional information

You can use the Amazon Redshift Spark connector (redshift-jdbc42-2.1.0.9) when using AWS Glue 4.0 ETL jobs. This connector has the following properties:

  • Supports IAM-based JDBC URLs.
  • Includes performance improvement options like autopushdown, autopushdown.s3_result_cache, and unload_s3_format.
  • Includes the SSE_KMS encryption option that can be used for data in the temporary folder. AWS Glue uses this data when reading from Amazon Redshift tables.

Related information

COPY

TRUNCATE

Data load operations

AWS OFFICIAL
AWS OFFICIALUpdated a year ago