![](https://dixidata.com/wp-content/uploads/2022/01/marina-grynykha-bEAL-aN10vs-unsplash-1-scaled.jpeg)
Snowflake supports data pipelines in the form of Snowpipe which “enables loading data from files as soon as they’re available in a stage” for automated continuous loading of data pipelines into the database tables. It can consume data files from cloud provider stages such as: Amazon S3, Google Cloud Storage, Microsoft Azure Blobs/Data Lakes/General purpose storage.
However, at the time of writing this, the problem most face is that while it can load the files automatically once they are “dropped” into the stage it does not PURGE the file as this option is not supported for Snowpipe unlike COPY INTO. This means that if we opt to use Snowpipe, once the pipe has consumed and loaded the file it will remain in the cloud providers storage account forever and makes determining a files state difficult. We are then paying twice to store the data file there and in Snowflake, so how do we deal with this?!
“At a glance, how do I know Snowpipe is working?”
As a data engineer, when developing data pipelines, I like to simplify my staged data areas (and my life) so that with a quick glance I can tell the state of that container:
- No files = downstream processes have successfully completed (loaded any files delivered to the container)
- Lots of files = potentially a bottleneck or downstream processing issue
- Old files = files have failed and need investigation
This isn’t to say that this approach is an alternative to robust logging structures and alert processes but nothing puts a data engineers mind more at ease than glancing at a source container to understand its state without needing to cross reference the logs first.
“The Options?”
There are two main approaches we can implement to keep our stage area nice and clean for cost benefits and performance:
- Implement file retention policies against the container as provided by the cloud provider
- Develop a solution in Snowflake that chains together some functionality to manage the stage
Option 1 at first appears simple, letting the cloud provider manage the removal of our older files based on a retention policy (e.g. remove files older than or last modified more than N days ago). However, the data engineer won’t know whether the files have been successfully loaded via Snowpipe, and may not have any control over the retention policy.
What if we turn off Snowpipe? What if an error silently blocks loading? What if a scenario occurs that means the files reach the retention policy age without being loaded?!
– Thoughts of a worried Data Engineer
I prefer to implement solutions that account for the processing results for my peace of mind; files won’t just vanish unless they’ve been loaded. To achieve Option 2 for this pseudo-PURGE functionality for Snowpipe, we just need to implement a Stored Procedure and run it on a defined schedule using a Task. Let me show you how.
The Solution!
A Stored Procedure and a Task
create or replace procedure SP_SNOWPIPE_PURGE(STAGE_NAME VARCHAR, PIPE_NAME VARCHAR, TABLE_NAME VARCHAR)
returns VARCHAR
language JAVASCRIPT
execute as CALLER
as
$$
i = 0, ii = 0;
list_cmd = `LIST @` + STAGE_NAME + `;`;
sql_cmd = `SELECT DISTINCT ST."name", SPLIT_PART(ST."name",'/',-1), CH.STATUS
FROM table(result_scan(last_query_id())) ST
INNER JOIN (select * from table(information_schema.copy_history(
table_name=>'` + TABLE_NAME + `', start_time=> dateadd(days, -1, current_timestamp())
))) CH
ON CH.STAGE_LOCATION || CH.FILE_NAME = ST."name"
WHERE CH.PIPE_NAME = '` + PIPE_NAME + `';`;
try {
snowflake.execute ({sqlText: list_cmd}); // List Files
rs = snowflake.execute ({sqlText: sql_cmd}); // Calculate Load Status
while (rs.next()) {
ii++; // Count Files
c1 = rs.getColumnValue(1); // Get Column 1 Value
c2 = rs.getColumnValue(2); // Get Column 2 Value
c3 = rs.getColumnValue(3); // Get Column 3 Value
if (c3 == "Loaded") {
rmv_cmd = "remove @ST_SNOWPIPE/" + c2; // Build Remove Command
snowflake.execute ({sqlText: rmv_cmd}); // Exec Remove Command
i++; // Count deletes
}
}
return "Purged " + i + " of " + ii + " file(s)!"; // Return a success indicator.
} catch (err) {
return "Failed: " + err; // Return an error indicator.
}
$$
;
create task TSK_SNOWPIPE_PURGE_LOCATIONS
WAREHOUSE = WH_DIXI_LOADING
SCHEDULE = 'USING CRON * 1 * * * GMT' // 1AM Daily
as
CALL SP_SNOWPIPE_PURGE('ST_SNOWPIPE','PIPE_LOCATIONS','LOCATIONS')
;
“So what’s happening?”
1. LIST command
Simple Snowflake command to show us what files we currently have stored in the Snowflake Stage/Cloud Providers Storage account and provides our source data to drive the process from.
LIST @ST_SNOWPIPE;
2. COPY_HISTORY table function
We then need to compare our list of staged data files against the load history for Snowpipe. The COPY_HISTORY table function has information on Snowflake’s load history for up to the last 14 days, by joining this to the file list we can determine which files to remove.
SELECT DISTINCT ST."name", SPLIT_PART(ST."name",'/',-1), CH.STATUS FROM table(result_scan(last_query_id())) ST INNER JOIN (select * from table(information_schema.copy_history( table_name=>'LOCATIONS', start_time=> dateadd(days, -1, current_timestamp()) ))) CH ON CH.STAGE_LOCATION || CH.FILE_NAME = ST."name" WHERE CH.PIPE_NAME = 'ST_SNOWPIPE';
Tip: I also apply a filter on the Snowpipe pipe name here to not interfere with files loaded manually with COPY INTO statements.
We can’t use the LIST command result set as a table source in our SQL query, we need to use the RESULT_SCAN function to get the previous queries results as a TABLE to query.
3. REMOVE Command
For each row returned by the COPY_HISTORY query, we check that its status was “Loaded” to indicate Snowpipe success and if so, execute a REMOVE command to delete that selected file from the stage area. Files with a status other than “Loaded” will remain for investigation (though I’m sure your system has robust monitoring on any pipeline process and therefore you should already know theres been an error right 😉). This REMOVE command is to replicate the PURGE functionality we find in COPY INTO statements.
REMOVE @ST_SNOWPIPE/Locations.csv;
4. RETURN status
Finally a VARCHAR field is returned to the Snowflake output to indicate if the Stored Procedure executed successfully.
On Success Sample:
CALL SP_SNOWPIPE_PURGE('PIPE_LOCATIONS','LOCATION'); RETURNS “Purged 4 of 5 file(s)!”
On Error Example:
CALL SP_SNOWPIPE_PURGE('PIPE_LOCATIONS','LOCATIONZ'); RETURNS “Failed: SQL compilation error: Table 'LOCATIONZ' does not exist or not authorized.”
5. TASK DDL
The final step is to schedule the execution of our Stored Procedure to make it a process that doesn’t require manual execution, this can be done every N minutes or using a CRON expression, I’ve opted for the latter for 1 AM each day.
CREATE TASK TSK_SNOWPIPE_PURGE_LOCATIONS WAREHOUSE = WH_DIXI_LOADING SCHEDULE = 'USING CRON * 1 * * * GMT' // 1AM Daily AS CALL SP_SNOWPIPE_PURGE('ST_SNOWPIPE','PIPE_LOCATIONS','LOCATIONS') ;
And that’s it! The Stored Procedure will execute nightly at 1AM as defined by our schedule and keep the storage containers squeaky clean and manageable with little user intervention while we sleep soundly at night.
The Before and After
![](https://dixidata.com/wp-content/uploads/2022/01/Screenshot-2022-01-06-at-16.15.40-1.png)
1. Container of 5 files
(Some loaded, some haven’t)
![](https://dixidata.com/wp-content/uploads/2022/01/Screenshot-2022-01-06-at-16.16.11-1.png)
2. Execution of Stored Procedure
![](https://dixidata.com/wp-content/uploads/2022/01/Screenshot-2022-01-06-at-16.17.17-1.png)
3. Container after execution
(process leaves the 1 file that failed to load as was broken)
1. Container of 5 files
(Some loaded, some haven’t)
2. Execution of Stored Procedure
3. Container after execution
(process leaves the 1 file that failed to load as was broken)
“What are the further considerations?”
This is purely for illustrative purposes on how to enable PURGE logic against Snowpipe’s automated consumption, why not consider developing the solution yourself further for productionisation by adding some of the following improvements:
- Improve error handling to return more information than counts or to continue deleting the other files on error if applicable
- Store results of executing the Stored Procedure to a log table, can be extended to include the names of the files deleted and at what time
- Generalise to clear down all stages rather than a single stage/pipe/table combination
- Archive the source data files before handing them to the Snowpipe’s container incase you need to perform a restore
- Return a VARIANT JSON Object or Array from the Stored Procedure for more flexibility on the next steps
Or get in touch with Dixi Data who will be happy to discuss how you can develop your Snowflake integration environment further!
Thanks for your blog, nice to read. Do not stop.
Ηello to every single one, it’s really a pleasant for me t᧐ pay a quick visit thіs web page,
it incⅼudes useful Information.
I believe that is among the such a lot important info for me.
And i am satisfied reading your article. However want to commentary on few normal things, The website taste is perfect, the articles is in reality great : D.
Excellent process, cheers
I am truly thankful to the owner of this web site who has shared this fantastic piece of writing at at this place.
Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.