S3 to Snowflake: Parallel Processing Using Task Trees

Snowflake makes it easy to copy data from an S3 bucket (JSON, CSV, parquet) into a Snowflake table either using its bulk load option or Snowpipe, for continuous loading. In this blog we will discuss ingestion using the bulk load option and more importantly how to implement parallel processing using Snowflake’s task trees.

COPY INTO: Snowflake Bulk Loading

Once secure access to your S3 bucket has been configured, the COPY INTO command can be used to bulk load data from your “S3 Stage” into Snowflake. Copy Into is an easy to use and highly configurable command that gives you the option to specify a subset of files to copy based on a prefix, pass a list of files to copy, validate files before loading, and also purge files after loading. 

In the following example, when run, the command will copy the sales.json file from an S3 bucket stage, load the contents into the house_sales table and delete the file after successful load.

copy into house_sales
from @mystage/sales.json
file_format = json_file_format
purge = true;

Once your copy into statement has been generated and validated, a Snowflake task can be used to bulk load data on a schedule. The following example will create an S3 task that runs every 2 hours and calls a simple stored procedure which executes the copy into command.

CREATE OR REPLACE TASK s3_to_snowflake_task
WAREHOUSE = s3_warehouse
SCHEDULE = '120 MINUTE'
AS
CALL sync_s3_to_snowflake_sp();

This implementation works great if you’re able to efficiently load all the files you need in one copy into command. If you have a scenario where you need to bulk load a high volume of files and your COPY INTO command is taking a significantly long time to complete, it may be beneficial for you to run multiple COPY INTO commands in parallel, especially if you are able to easily group files under common prefixes, e.g. bucket\northamerica\*, bucket\europe\*, etc. While you can call multiple SQL queries within one stored procedure, they can only be run sequentially versus async. The other option would be to have multiple tasks running in parallel, which is doable when you create a tree of tasks. Each child task launched will create its own session which would allow multiple queries to run in parallel on a single warehouse. Alternatively you can choose to assign each child task a different warehouse.  Multiple child tasks can also call the same stored procedure and you can pass parameters, such as the bucket prefix. You can learn more about the Snowflake tree of tasks here.

In the following example, you have a root task s3_load_task_root, and 3 child tasks. Once the root task finishes running, which in this scenario creates a Snowflake table if not already existing, the child tasks which call the bulk load stored procedure run.

A few things to keep in mind:

  • The root task needs to be enabled before the child tasks using the task RESUME command.
  • To enable all child tasks under a root task, you can run: select system$task_dependents_enable(‘root_task_name’);
CREATE OR REPLACE TASK s3_load_task_root
WAREHOUSE = s3_warehouse
 SCHEDULE = '120 MINUTE'
AS
CALL create_s3_load__table();
CREATE OR REPLACE TASK s3_load_task_node1
WAREHOUSE = s3_warehouse
AFTER s3_load_task_root
AS
call sync_s3_to_snowflake_sp('NorthAmerica');
CREATE OR REPLACE TASK s3_load_task_node2
WAREHOUSE = s3_warehouse
AFTER s3_load_task_root
AS
call sync_s3_to_snowflake_sp('Europe');
CREATE OR REPLACE TASK s3_load_task_node3
WAREHOUSE = s3_warehouse
AFTER s3_load_task_root
AS
call sync_s3_to_snowflake_sp('Africa'