Orchestrating flat file consumption with Snowpipe & Tasks
Snowflake has indeed proven to be a force to reckon with when it comes to SQL based databases, offering much more flexibility minus the overhead of hosting and maintenance. Its latest offerings that assist in building and automating pipelines for consumption from external sources makes it more lucrative than its competitors.
Amazed by the possibilities; I intend to provide a glimpse of one such pipeline that could be possible if we play our cards right.
We will delve into how the Snowpipe service can be used in tandem with Snowflake Tasks to automate ingestion of flat files from an external service for analytics.
The data we will be using comes from citibike’s opensource data store.
Python code used to query citibike’s API which then drops the files into an S3 bucket are out of scope for this article and we will thus be focusing on the snowflake side of things.
Definitions:
RAW Table: Will be a landing table for all rows that are read directly from the flat file. The table name used here is trips_raw.
INT Table: This will house cleaned and well-defined columns that are essential for analytics. The table name used here is trips_int.
Snowpipe: Is a service that reads new files placed in a particular S3 Bucket or folder and performs a defined copy command to a table.
Tasks: Snowflake tasks are an offering that enables one to execute a single SQL statement or a stored procedure either on a schedule or when a condition is fulfilled.
Step 1: Create a Snowpipe
Assuming that you have the RAW and INT table structures defined we can move onto creating a Snowpipe.
A snowpipe can be defined as such: docs
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>[ AUTO_INGEST = [ TRUE | FALSE ] ][ AWS_SNS_TOPIC = <string> ][ INTEGRATION = ‘<string>’ ][ COMMENT = ‘<string_literal>’ ]AS <copy_statement>
We will thus be creating a pipe named trips_pipe.
create or replace pipe trips_pipe
auto_ingest = true
as
copy into trips_raw from @BIKEDATA_STAGE/snowflake_data/snowpipe_folder/;
Here we have created a pipe named trips_pipe which will perform a copy command from a Stage named BIKEDATA_STAGE into our RAW table.
Please do refer the following link to create a stage in snowflake.
Once created use the
show pipes
command to ensure that the pipe was created and also to grab the notification_channel ARN that is assigned to the pipe from Snowflake.
Note: The ARN assigned by the Snowflake refers to an SQS queue that will be used to queue the files that need to be ingested.
We will need to perform a few extra steps to establish a connection between our S3 source bucket and the SQS that Snowflake has provisioned.Armed with the ARN code let us head over to our S3 bucket.
The Idea here is to enable S3 to send a notification over to the SQS queue whenever a new file is dropped into a specified folder that resides in it.
Head over to the Events Box located in the bucket properties.
Create a new notification and copy the ARN received from snowflake into the SQS queue ARN box.
Prefix: this is the folder that will be monitored and will send a notification if a file is dropped.
Events: Here we have specified PUT as the event.
Once done, do test this pipe by dropping a file.
You can monitor the status of the pipe using the following code:
select SYSTEM$PIPE_STATUS(‘trips_pipe’);
As seen here pendingFileCount is 1 indicating that the file dropped into the bucket is currently being processed.
Once this process completes the resultant data should be seen in our destination; RAW table in this case.
Alright! We are now good for Step 2.
Step 2: Establish a Stream on our RAW Table
Since we are not truncating our RAW to have a reliable snapshot of the data that is ingested to date; we will use streams to get a sense of rows that have just arrived.
This stream will be used to aide CDC mechanisms and avoid duplication.
To create a stream:
CREATE OR REPLACE STREAM <Stream_name> ON <Table_name>
We will thus create a stream named NEW_TRIPS.
create or replace stream NEW_TRIPS on table trips_raw;
Try testing the enhanced pipeline by dropping another file into the S3 bucket.
Awesome! Off to Step 3 then.
Note:
- Stream tables replicate the original table with an addition of 3 more columns namely: METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
- Data exists in a stream as long as it isn’t consumed. Consumption in this case would entail executing insert, copy etc. statements against the stream.
Step3: Task to move data from RAW to INT
Tasks in snowflake allow one to execute statements on a schedule or based on a fulfillment of a condition.
Let’s define one where in we will not only move but perform some basic data conversions.
CREATE [ OR REPLACE ] TASK [ IF NOT EXISTS ] <name>WAREHOUSE = <string>[ SCHEDULE = '{ <num> MINUTE | USING CRON <expr> <time_zone> }' ][ <session_parameter> = <value> [ , <session_parameter> = <value> ... ] ][ USER_TASK_TIMEOUT_MS = <num> ][ COPY GRANTS ][ COMMENT = '<string_literal>' ][ AFTER <string> ][ WHEN <boolean_expr> ]AS<sql>
Note: Task scheduling needs ACCOUNTADMIN level privileges. Do ensure your user does have it.
CREATE OR REPLACE TASK copy_raw_to_int
WAREHOUSE = COMPUTE_WH
SCHEDULE = ‘5 minute’
WHEN SYSTEM$STREAM_HAS_DATA(‘NEW_TRIPS’)
AS
insert into trips_int select TRIPDURATION,to_timestamp(split(split(STARTTIME,’/’)[2],’ ‘)[0] || ‘-’ || split(STARTTIME,’/’)[0] || ‘-’ ||split(STARTTIME,’/’)[1] || ‘ ‘ || split(split(STARTTIME,’/’)[2],’ ‘)[1]) as STARTTIME, to_timestamp(split(split(STOPTIME,’/’)[2],’ ‘)[0] || ‘-’ || split(STOPTIME,’/’)[0] || ‘-’ ||split(STOPTIME,’/’)[1] || ‘ ‘ || split(split(STOPTIME,’/’)[2],’ ‘)[1]) as STOPTIME ,”start_station_id”,”start station name”,”start station latitude”,”start station longitude”,”end station id”,”end station name”,”end station latitude”,”end station longitude”,”bikeid”,”usertype”,”birth year”,DECODE(“gender”,0,’Male’,’1',’Female’) as gender from UTIL_DB.PUBLIC.NEW_TRIPS;
Dissecting the above code:
- WAREHOUSE: Specify the warehouse you intend to use. This should be based on the amount of data you expect to be consumed and the frequency.
- SCHEDULE: This will determine when and how often the task would be executed. Here I have chosen to execute our task every 5 minutes.
- WHEN: This condition is crucial to ensure that we do not execute the task unless the stream has new data. Condition reduces the no of runs and cuts down on costs too.
- AS: We then specify the SQL statement (in case of single line statements) or Stored procedures (in case of multiple lines) to be executed if the WHEN condition is met.
Once created; a Task is in a SUSPENDED state by default.
To enable it we will execute an ALTER statement and move it into RESUME state.
Alter task if exists copy_raw_to_int RESUME;
It is always a good practice to have if exists keyword to avoid error messages.
Debugging Task Execution
Snowflake uses the task_history() table to log each task execution and its status.
Command:
select *
from table(information_schema.task_history())
order by scheduled_time desc;
We thus drop another file into our S3 bucket which will trigger the snowpipe and in turn our task to clean and channel our datasets into our INT table.
Another execution of the query against the task_history() table shows the following:
Finally, we will head over to our INT table and check how it looks:
Voila! We have data!
A curious thing to check here is the stream to see if it still has records:
Awesome! As expected; streams do purge data once consumption is done successfully.
With a few googling around, a few blogs and a bunch of SQL queries we now have an automated pipeline that not only reads data from a flat file but also cleans and loads into our final table.
Imagine the possibilities!
Other Useful links
Snowflake docs to create tasks
https://docs.snowflake.com/en/sql-reference/sql/create-task.html
Trouble shooting tasks
https://docs.snowflake.com/en/user-guide/tasks-ts.html#task-did-not-run
Pipe Creation and Syntax
https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html
Task Alteration and states
https://docs.snowflake.com/en/sql-reference/sql/alter-task.html
Citibike datastore