Skip to Content

Incremental export of Snowflake table data to an S3 bucket

Problem Statement:

Sync data flowing into a large Snowflake table to a bucket

The overall problem is to sync changes occurring in a Snowflake table to a BigQuery table. The Snowflake table is owned by an external organization who don’t want to do much dev or maintenance. The BigQuery table is owned by us, and we have access to most gcp services. The external org does not want to give us access to the Snowflake account in any shape or form - so they won’t give us credentials to query Snowflake directly. Since we can create and manage gcp buckets, we have scoped the collaborative problem to the smallest possible set of steps - that is, get the data out of Snowflake and into a bucket. We can handle the ingest from there.

Constraints:

  • Must be totally automated
  • Must sync only what has changed to save costs
  • Must be implemented totally inside Snowflake

Solution:

It turns out you can do this all inside Snowflake, and it’s not even very hard once you know what to do. Assuming the bucket already exists, and is called bucket-i-own:

In stock form, the unload will overwrite the same path over and over again. Because you can’t pass dynamic variables to the raw task, you need to make use of a function to create prefix paths.

Let’s break this down with a reproducible example. Let’s assume we have a bucket called bucket-i-own.

Create a demo database and schema:

CREATE DATABASE INCREMENTAL_SEND_POC;
CREATE SCHEMA INCREMENTAL_SEND_POC.SAMPLE_DATA;

Create an integration (requires appropriate permissions):

-- CREATE AN EXTERNAL STAGE
CREATE STORAGE INTEGRATION bucket_i_own
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket-i-own/');

-- GET THE NAME OF THE SERVICE ACCOUNT TO ACCESS THE BUCKET
DESC STORAGE INTEGRATION bucket_i_own ;

Let’s assume the service account returned by the DESC query is saaa00000@awsuseast2-111.iam.gserviceaccount.com.

Go to the bucket, click permissions, under View By Principals, Click Grant Access, paste the service account into New Principals, and select Storage Object Admin under Assign Iam Roles.

Create a stage:

CREATE OR REPLACE STAGE demo_orders
URL = 'gcs://bucket-i-own/orders'
 STORAGE_INTEGRATION = bucket_i_own
  FILE_FORMAT = ( TYPE = PARQUET ) 
  COMMENT = 'Managed GCP bucket for receiving demo data' ;

At this point you can technically load whatever you want into the bucket from Snowflake. Let’s create our functions and procedures for dynamic paths (thanks Big Data Dave):

-- A FUNCTION TO RETURN NOW() AS A STRING
CREATE OR REPLACE FUNCTION UDF_NOWASSTRING()
    RETURNS STRING
    LANGUAGE JAVASCRIPT
    AS
    $$
    return Date.now().toString();
    $$;

-- TEST THE FUNCTION    
SELECT UDF_NOWASSTRING();

--A PROCEDURE TO UNLOAD A TABLE TO A STAGE AT A PROVIDED PATH PREFIX
CREATE OR REPLACE PROCEDURE SP_UNLOAD_DYNAMIC_PATH(PATH STRING, SOURCE_TABLE STRING, DEST_STAGE STRING)
    RETURNS STRING
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS
    $$
     
    //Result Place Holder
    var result="";
     
    //Build SQL
    var sql00 = `COPY INTO ` + DEST_STAGE + `/` + PATH + `/ FROM (SELECT * from ` + SOURCE_TABLE + `)
        detailed_output=true header=true;`;

    var sql01 = `select count(*) from ` + DEST_STAGE +  `/` + PATH + `/;`;
 
    //Execute SQL
    try {
        var stmt00 = snowflake.createStatement( { sqlText: sql00 } );
        stmt00.execute();
         
        var stmt01 = snowflake.createStatement( { sqlText: sql01 } );
        var rs01 = stmt01.execute();
        rs01.next();
        var rowCount = (stmt01.getRowCount()>0) ? rs01.getColumnValue(1) : 0;
          
        result = "Succeeded! Rows Unloaded(" + rowCount + ")";
    }
     
    catch (err)  {
        result =  "Failed: Code: " + err.code + "\n  State: " + err.state;
        result += "\n  Message: " + err.message;
        result += "\nStack Trace:\n" + err.stackTraceTxt; 
    }
     
    return result;
    $$;

Ok, now we have everything in place to run a manual demo. You can step through the code below interactively and watch the results

-- CREATE AN INITIAL EMPTY TABLE
CREATE OR REPLACE TABLE INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS AS
    SELECT L_ORDERKEY, L_COMMITDATE FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM LIMIT 0;

-- VERIFY TABLE EXISTS AND HAS NO DATA
SELECT COUNT(*) FROM INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS;
-- WE HAVE 0 ROWS    

-- CREATE A STREAM ON THE TABLE
CREATE OR REPLACE STREAM INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM 
ON TABLE INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS;

-- INSERT SOME DATA INTO THE BASE TABLE
INSERT INTO INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS 
    SELECT L_ORDERKEY, L_COMMITDATE FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM WHERE L_COMMITDATE = DATE('1998-10-22');
-- NUMBER OF ROWS INSERTED: 413

SELECT COUNT(*) FROM INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM
-- STREAM HAS 413 ROWS

-- MANUAL UNLOAD
COPY INTO '@demo_orders/' FROM (SELECT * from INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM  )  overwrite=true detailed_output=true header=true; 

SELECT COUNT(*) FROM INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM 
-- 0 ROWS, ALL HAVE BEEN UNLOADED

SELECT COUNT(*) FROM @demo_orders;
-- 413 ROWS ARE IN THE BUCKET

-- USE THE FUNCTUON TO UNLOAD TO THE BUCKET
-- INSERT SOME MORE DATA INTO THE BASE TABLE
INSERT INTO INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS 
    SELECT L_ORDERKEY, L_COMMITDATE FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM WHERE L_COMMITDATE = DATE('1998-10-22');
-- NUMBER OF ROWS INSERTED: 413

-- CALL THE FUNCTION
CALL SP_UNLOAD_DYNAMIC_PATH(UDF_NOWASSTRING(), 'INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM' , '@demo_orders');

SELECT COUNT(*) FROM INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM 
-- 0 ROWS, ALL HAVE BEEN UNLOADED

SELECT COUNT(*) FROM @demo_orders;
-- 826 ROWS ARE IN THE BUCKET

Ok, now to automate it.

-- SET UP A RECURRING TASK TO UNLOAD
CREATE OR REPLACE TASK INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM_TSK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 minute' -- very frequent for purposes of demo
WHEN
SYSTEM$STREAM_HAS_DATA('INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM')
AS
CALL SP_UNLOAD_DYNAMIC_PATH(UDF_NOWASSTRING(), 'INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM' , '@demo_orders')
;

DESCRIBE TASK ORDERS_STRM_TSK;

-- SET UP A RECURRING TASK TO SIMULATE ADDITONS TO THE TABLE
CREATE OR REPLACE TASK INCREMENTAL_SEND_POC.SAMPLE_DATA.ADD_RECORDS_TO_SAMPLE_STREAM
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 minute' -- very frequent for purposes of demo
AS
INSERT INTO INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS 
    SELECT L_ORDERKEY, L_COMMITDATE FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM WHERE L_COMMITDATE = DATE('1998-10-22')
;

DESCRIBE TASK ADD_RECORDS_TO_SAMPLE_STREAM;

ALTER TASK INCREMENTAL_SEND_POC.SAMPLE_DATA.ORDERS_STRM_TSK RESUME;
ALTER TASK INCREMENTAL_SEND_POC.SAMPLE_DATA.ADD_RECORDS_TO_SAMPLE_STREAM RESUME;

SHOW TASKS;

--WAIT X MINUTES
--CHECK BUCKET
--THERE SHOULD NOW BE x/5 prefixes (+1 for the manual one) in the bucket
--BUCKET CONTENTS SHOULD HAVE x * 413 RECORDS

You should see data being unloaded into the bucket every minute. Obviously this is pretty inefficient; I would probably run this every few hours or daily.

Finally, don’t forget to delete everything so you don’t end up with a zombie task!

DROP DATABASE INCREMENTAL_SEND_POC;