Snowflake makes it easy to copy data from an S3 bucket (JSON, CSV, parquet) into a Snowflake table either using its bulk load option or Snowpipe, for continuous loading. In this blog we will discuss ingestion using the bulk load option and more importantly how to implement parallel processing using Snowflake’s task trees.
COPY INTO: Snowflake Bulk Loading
In the following example, when run, the command will copy the sales.json file from an S3 bucket stage, load the contents into the house_sales table and delete the file after successful load.
copy into house_sales
from @mystage/sales.json
file_format = json_file_format
purge = true;
Once your copy into statement has been generated and validated, a Snowflake task can be used to bulk load data on a schedule. The following example will create an S3 task that runs every 2 hours and calls a simple stored procedure which executes the copy into command.
CREATE OR REPLACE TASK s3_to_snowflake_task WAREHOUSE = s3_warehouse SCHEDULE = '120 MINUTE' AS CALL sync_s3_to_snowflake_sp();
This implementation works great if you’re able to efficiently load all the files you need in one copy into command. If you have a scenario where you need to bulk load a high volume of files and your COPY INTO command is taking a significantly long time to complete, it may be beneficial for you to run multiple COPY INTO commands in parallel, especially if you are able to easily group files under common prefixes, e.g. bucket\northamerica\*, bucket\europe\*, etc. While you can call multiple SQL queries within one stored procedure, they can only be run sequentially versus async. The other option would be to have multiple tasks running in parallel, which is doable when you create a tree of tasks. Each child task launched will create its own session which would allow multiple queries to run in parallel on a single warehouse. Alternatively you can choose to assign each child task a different warehouse. Multiple child tasks can also call the same stored procedure and you can pass parameters, such as the bucket prefix. You can learn more about the Snowflake tree of tasks here.
In the following example, you have a root task s3_load_task_root, and 3 child tasks. Once the root task finishes running, which in this scenario creates a Snowflake table if not already existing, the child tasks which call the bulk load stored procedure run.
A few things to keep in mind:
- The root task needs to be enabled before the child tasks using the task RESUME command.
- To enable all child tasks under a root task, you can run: select system$task_dependents_enable(‘root_task_name’);
CREATE OR REPLACE TASK s3_load_task_root WAREHOUSE = s3_warehouse SCHEDULE = '120 MINUTE' AS CALL create_s3_load__table();
CREATE OR REPLACE TASK s3_load_task_node1 WAREHOUSE = s3_warehouse AFTER s3_load_task_root AS call sync_s3_to_snowflake_sp('NorthAmerica');
CREATE OR REPLACE TASK s3_load_task_node2 WAREHOUSE = s3_warehouse AFTER s3_load_task_root AS call sync_s3_to_snowflake_sp('Europe');
CREATE OR REPLACE TASK s3_load_task_node3 WAREHOUSE = s3_warehouse AFTER s3_load_task_root AS call sync_s3_to_snowflake_sp('Africa'