How to Use AWS Glue with Snowflake

The process of extraction, transformation and load (ETL) is central to any data warehousing initiative. With advances in cloud data warehouse architectures, customers are also benefiting from the alternative approach of extraction, load, and transformation (ELT), where data processing is pushed to the database.

With either approach, the debate continues. Should you take a hand-coded method or leverage any number of the available ETL or ELT data integration tools? While there are advantages to both and some will choose a “one or the other” approach, many organizations select a combination of a data integration tool and hand coding. Code provides developers with the flexibility to build using preferred languages while maintaining a high level of control over integration processes and structures. The challenge has been that hand-coding options are traditionally more complex and costly to maintain. However, with AWS Glue, developers now have an option to easily build and manage their data preparation and loading processes with generated code that is customizable, reusable and portable with no infrastructure to buy, setup or manage.

In this blog, we’ll cover how to leverage the power of AWS Glue with Snowflake and how processing is optimized through the use of query pushdown for ELT.

Why AWS Glue with Snowflake

Snowflake customers now have a simple option to manage their programmatic data integration processes without worrying about servers, Spark clusters or the ongoing maintenance traditionally associated with these systems. AWS Glue provides a fully managed environment which integrates easily with Snowflake’s data warehouse-as-a-service. Together, these two solutions enable customers to manage their data ingestion and transformation pipelines with more ease and flexibility than ever before. With AWS Glue and Snowflake, customers get the added benefit of Snowflake’s query pushdown which automatically pushes Spark workloads, translated to SQL, into Snowflake. Customers can focus on writing their code and instrumenting their pipelines without having to worry about optimizing Spark performance (For more on this, read our “Why Spark” and our “Pushing Spark Query Processing to Snowflake” blogs). With AWS Glue and Snowflake, customers can reap the benefits of optimized ELT processing that is low cost and easy to use and maintain.

AWS Glue and Snowflake in Action

Prerequisites:

Setup

  1. Log into AWS.
  2. Search for and click on the S3 link.
    1. Create an S3 bucket and folder.
    2. Add the Spark Connector and JDBC .jar files to the folder.
    3. Create another folder in the same bucket to be used as the Glue temporary directory in later steps (see below).
  3. Switch to the AWS Glue Service.
  4. Click on Jobs on the left panel under ETL.
  5. Add a job by clicking Add job, click Next, click Next again, then click Finish.
    1. Provide a name for the job.
    2. Select an IAM role. Create a new IAM role if one doesn’t already exist and be sure to add all Glue policies to this role.
    3. Select the option for A new script to be authored by you.
    4. Give the script a name.
    5. Set the temporary directory to the one you created in step 2c.
    6. Expand Script libraries and job parameters:
      1. Under Dependent jars path, add entries for both .jar files from 2b.

[NOTE: You have to add the full path to the actual .jars files. Example: s3://[bucket_name]/GlueJars/spark-snowflake_2.11-2.2.6.jar,s3://[bucket_name]/GlueJars/snowflake-jdbc -3.2.4.jar]

2. Under Job parameters, enter the following information with your Snowflake account information. Make sure to include the two dashes before each key.  

[NOTE: Storing your account information and credentials this way, will expose them to anyone with access to this job. This can be useful for testing purposes but it is recommended that you securely store your credentials as outlined in the section: Store credentials securely.]

  1. Click Next, click Next again, then click Finish.
  2. You will be prompted with a blank script interface.

Sample script

Use the following sample script to test the integration between AWS Glue and your Snowflake account. This script assumes you have stored your account information and credentials using Job parameters as described in section 5.6.2.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake";

## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
java_import(spark._jvm, "net.snowflake.spark.snowflake";)

## uj = sc._jvm.net.snowflake.spark.snowflake
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
sfOptions = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfWarehouse" : args['WAREHOUSE'],
}

## Read from a Snowflake table into a Spark Data Frame
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "[table_name]").load()

## Perform any kind of transformations on your data and save as a new Data Frame: df1 = df.[Insert any filter, transformation, or other operation]
## Write the Data Frame contents back to Snowflake in a new table df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "[new_table_name]").mode("overwrite").save() job.commit()

Securing credentials

To securely store your account information and credentials, see the following article which describes how this is accomplished with EC2: How to Securely Store Credentials with EC2.

Conclusion

AWS Glue and Snowflake make it easy to get started and manage your programmatic data integration processes. AWS Glue can be used standalone or in conjunction with a data integration tool without adding significant overhead. With native query pushdown through the Snowflake Spark connector, this approach optimizes both processing and cost for true ELT processing. With AWS Glue and Snowflake, customers get a fully managed, fully optimized platform to support a wide range of custom data integration requirements.

 

Subscribe to the snowflake blog

Connecting a Jupyter Notebook to Snowflake via Spark (Part 4)

In the third part of this series, we learned how to connect Sagemaker to Snowflake using the Python connector. In this fourth and final post, we’ll cover how to connect Sagemaker to Snowflake with the Spark connector. If you haven’t already downloaded the Jupyter Notebooks, you can find them here.  

You can review the entire blog series here: Part One > Part Two > Part Three > Part Four.  

Spark Connector – local Spark

We’ll start with building a notebook that uses a local Spark instance. The Snowflake jdbc driver and the Spark connector must both be installed on your local machine. Installation of the drivers happens automatically in the Jupyter Notebook, so there’s no need for you to manually download the files. However, as a reference, the drivers can be can be downloaded here.

First, let’s review the installation process. The step outlined below handles downloading all of the necessary files plus the installation and configuration. You can initiate this step by performing the following actions:

  • Create a directory for the snowflake jar files
  • Define the drivers to be downloaded
  • Identify the latest version of the driver
  • Download the driver
%%bash
SFC_DIR=/home/ec2-user/snowflake
[ ! -d "$SFC_DIR" ] && mkdir $SFC_DIR 
cd $SFC_DIR
PRODUCTS='snowflake-jdbc spark-snowflake_2.11'
for PRODUCT in $PRODUCTS
do
   wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/maven-metadata.xml" 2> /dev/null
   VERSION=$(grep latest maven-metadata.xml | awk -F">" '{ print $2 }' | awk -F"<" '{ print $1 }')
   DRIVER=$PRODUCT-$VERSION.jar
   if [[ ! -e $DRIVER ]]
   then
      rm $PRODUCT* 2>/dev/null
      wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/$VERSION/$DRIVER" 2> /dev/null
   fi
   [ -e maven-metadata.xml ] && rm maven-metadata.xml
Done

After both jdbc drivers are installed, you’re ready to create the SparkContext. But first, let’s review how the step below accomplishes this task.

To successfully build the SparkContext, you must add the newly installed libraries to the CLASSPATH. You can start by running a shell command to list the content of the installation directory, as well as for adding the result to the CLASSPATH. With the Spark configuration pointing to all of the required libraries, you’re now ready to build both the Spark and SQL context.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql.types import *
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

sfc_jars=!ls -d /home/ec2-user/snowflake/*.jar

conf = (SparkConf()
        .set("spark.driver.extraClassPath", (":".join(classpath_jars())+":"+":".join(sfc_jars)))
        .setMaster('local')
        .setAppName('local-spark-test'))
sc=SparkContext(conf=conf)

spark = SQLContext(sc)
sc

With the SparkContext now created, you’re ready to load your credentials. You can complete this step following the same instructions covered in part three of this series.

import boto3

params=['/SNOWFLAKE/URL','/SNOWFLAKE/ACCOUNT_ID'
        ,'/SNOWFLAKE/USER_ID','/SNOWFLAKE/PASSWORD'
        ,'/SNOWFLAKE/DATABASE','/SNOWFLAKE/SCHEMA'
        ,'/SNOWFLAKE/WAREHOUSE','/SNOWFLAKE/BUCKET'
        ,'/SNOWFLAKE/PREFIX']

region='us-east-1'

def get_credentials(params):
   ssm = boto3.client('ssm',region)
   response = ssm.get_parameters(
      Names=params,
      WithDecryption=True
   )
   #Build dict of credentials
   param_values={k['Name']:k['Value'] for k in  response['Parameters']}
   return param_values

param_values=get_credentials(params)

After the SparkContext is up and running, you’re ready to begin reading data from Snowflake through the spark.read method. For this example, we’ll be reading 50 million rows.

sfOptions = {
  "sfURL" : param_values['/SNOWFLAKE/URL'],
  "sfAccount" : param_values['/SNOWFLAKE/ACCOUNT_ID'],
  "sfUser" : param_values['/SNOWFLAKE/USER_ID'],
  "sfPassword" : param_values['/SNOWFLAKE/PASSWORD'],
  "sfDatabase" : param_values['/SNOWFLAKE/DATABASE'],
  "sfSchema" : param_values['/SNOWFLAKE/SCHEMA'],
  "sfWarehouse" : param_values['/SNOWFLAKE/WAREHOUSE'],
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", \
"select (V:main.temp_max - 273.15) * 1.8000 + 32.00 as temp_max_far, " +\
"       (V:main.temp_min - 273.15) * 1.8000 + 32.00 as temp_min_far, " +\
"       cast(V:time as timestamp) time, " +\
"       V:city.coord.lat lat, " +\
"       V:city.coord.lon lon " +\
"from snowflake_sample_data.weather.weather_14_total limit 5000000").load()

Here, you’ll see that I’m running a Spark instance on a single machine (i.e., the notebook instance server). On my notebook instance, it took about 2 minutes to first read 50 million rows from Snowflake and compute the statistical information.

Reading the full dataset (225 million rows) can render the notebook instance unresponsive. This is likely due to running out of memory. To mitigate this issue, you can either build a bigger notebook instance by choosing a different instance type or by running Spark on an EMR cluster. The first option is usually referred to as scaling up, while the latter is called scaling out. Scaling out is more complex, but it also provides you with more flexibility. As such, we’ll review how to run the notebook instance against a Spark cluster.

Using the Spark Connector to create an EMR cluster

Harnessing the power of Spark requires connecting to a Spark cluster rather than a local Spark instance. Building a Spark cluster that is accessible by the Sagemaker Jupyter Notebook requires the following steps:

  • The Sagemaker server needs to be built in a VPC and therefore within a subnet
  • Build a new security group to allow incoming requests from the Sagemaker subnet via Port 8998 (Livy API) and SSH (Port 22) from you own machine (Note: This is for test purposes)
  • Advanced Options
    • Use the Advanced options link to configure all of necessary options
  • Software and Steps
    • Pick Hadoop and Spark
    • Optionally, you can select Zeppelin and Ganglia
  • Hardware
    • Validate the VPC (Network). Note: The Sagemaker host needs to be created in the same VPC as the EMR cluster
    • Optionally, you can also change the instance types and indicate whether or not to use spot pricing
  • General Cluster Settings
    • Set the cluster name
    • Keep Logging for troubleshooting problems
  • Security
    • Pick an EC2 key pair (create one if you don’t have one already). Without the key pair, you won’t be able to access the master node via ssh to finalize the setup.
    • Create and additional security group to enable access via SSH and Livy
  • On the EMR master node, install pip packages sagemaker_pyspark, boto3 and sagemaker for python 2.7 and 3.4
  • Install the Snowflake Spark & JDBC driver
  • Update Driver & Executor extra Class Path to include Snowflake driver jar files

Let’s walk through this next process step-by-step. In the AWS console, find the EMR service, click “Create Cluster” then click “Advanced Options”

Creating a Spark cluster is a four-step process. Step one requires selecting the software configuration for your EMR cluster. (Note: Uncheck all other packages, then check Hadoop, Livy, and Spark only).

Step two specifies the hardware (i.e., the types of virtual machines you want to provision). For a test EMR cluster, I usually select spot pricing. As of the writing of this post, an on-demand M4.LARGE EC2 instance costs $0.10 per hour. I can typically get the same machine for $0.04, which includes a 32 GB SSD drive.

Step three defines the general cluster settings. Be sure to check “Logging” so you can troubleshoot if your Spark cluster doesn’t start. Next, configure a custom bootstrap action (You can download the file here).

The script performs the following steps:

  1. Installation of the python packages sagemaker_pyspark, boto3, and sagemaker for python 2.7 and 3.4
  2. Installation of the Snowflake JDBC and Spark drivers. As of writing this post, the newest versions are 3.5.3 (jdbc) and 2.3.1 (spark 2.11)
  3. Creation of a script to update the extraClassPath for the properties spark.driver and spark.executor
  4. Creation of a start a script to call the script listed above

Step D may not  look familiar to some of you; however, it’s necessary because when AWS creates the EMR servers, it also starts the bootstrap action. At this stage, the Spark configuration files aren’t yet installed; therefore the extra CLASSPATH properties can’t be updated. Step D starts a script that will wait until the EMR build is complete, then run the script necessary for updating the configuration.

The last step required for creating the Spark cluster focuses on security.

To enable the permissions necessary to decrypt the credentials configured in the Jupyter Notebook, you must first grant the EMR nodes access to the Systems Manager. In part 3 of this blog series, decryption of the credentials was managed by a process running with your account context, whereas here, in part 4, decryption is managed by a process running under the EMR context. As such, the EMR process context needs the same system manager permissions granted by the policy created in part 3, which is the SagemakerCredentialsPolicy.

Next, click on “EMR_EC2_DefaultRole” and “Attach policy,” then, find the SagemakerCredentialsPolicy.

At this stage, you must grant the Sagemaker Notebook instance permissions so it can communicate with the EMR cluster. Start by creating a new security group. (I named mine SagemakerEMR). Within the SagemakerEMR security group, you also need to create two inbound rules.

The first rule (SSH) enables you to establish a SSH session from the client machine (e.g. your laptop) to the EMR master. While this step isn’t necessary, it makes troubleshooting much easier.

The second rule (Custom TCP) is for port 8998, which is the Livy API. This rule enables the Sagemaker Notebook instance to communicate with the EMR cluster through the Livy API. The easiest way to accomplish this is to create the Sagemaker Notebook instance in the default VPC, then select the default VPC security group as a source for inbound traffic through port 8998.

After you’ve created the new security group, select it as an “Additional Security Group” for the EMR Master.

Next, click “Create Cluster” to launch the roughly 10-minute process. When the cluster is ready, it will display as “waiting.”

You now have your EMR cluster. Now, you need to find the local IP for the EMR Master node because the EMR master node hosts the Livy API, which is, in turn, used by the Sagemaker Notebook instance to communicate with the Spark cluster. To find the local API, select your cluster, the hardware tab and your EMR Master. Next, scroll down to the find the private IP and make note of it as you will need it for the Sagemaker configuration.

Build the Sagemaker Notebook instance

To utilize the EMR cluster, you first need to create a new Sagemaker Notebook instance in a VPC. To minimize the inter-AZ network, I usually co-locate the notebook instance on the same subnet I use for the EMR cluster. Finally, choose the VPCs default security group as the security group for the Sagemaker Notebook instance (Note: For security reasons, direct internet access should be disabled).

When the build process for the Sagemaker Notebook instance is complete, download the Jupyter Spark-EMR-Snowflake Notebook to your local machine, then upload it to your Sagemaker Notebook instance.

Next, review the first task in the Sagemaker Notebook and update the environment variable EMR_MASTER_INTERNAL_IP with the internal IP from the EMR cluster and run the step (Note: In the example above, it appears as ip-172-31-61-244.ec2.internal).

If the Sparkmagic configuration file doesn’t exist, this step will automatically download the Sparkmagic configuration file, then update it so that it points to the EMR cluster rather than the localhost. To affect the change, restart the kernel.

%%bash
EMR_MASTER_INTERNAL_IP=ip-172-31-58-190.ec2.internal
CONF=/home/ec2-user/.sparkmagic/config.json
if [[ ! -e $CONF.bk ]]
then
   wget "https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json" \
-P /home/ec2-user/.sparkmagic -O /home/ec2-user/.sparkmagic/config.json.bk 2>/dev/null
fi
cat $CONF.bk | sed "s/localhost/$EMR_MASTER_INTERNAL_IP/" > $CONF.new
if [[ $(diff $CONF.new $CONF) ]]
then
   echo "Configuration has changed; Restart Kernel"
fi
cp $CONF.new $CONF

After restarting the kernel, the following step checks the configuration to ensure that it is pointing to the correct EMR master. If it is correct, the process moves on without updating the configuration.

Upon running the first step on the Spark cluster, the Pyspark kernel automatically starts a SparkContext.

import boto3

params=['/SNOWFLAKE/URL','/SNOWFLAKE/ACCOUNT_ID'
        ,'/SNOWFLAKE/USER_ID','/SNOWFLAKE/PASSWORD'
        ,'/SNOWFLAKE/DATABASE','/SNOWFLAKE/SCHEMA'
        ,'/SNOWFLAKE/WAREHOUSE','/SNOWFLAKE/BUCKET'
        ,'/SNOWFLAKE/PREFIX']

region='us-east-1'

def get_credentials(params):
   ssm = boto3.client('ssm',region)
   response = ssm.get_parameters(
      Names=params,
      WithDecryption=True
   )
   #Build dict of credentials
   param_values={k['Name']:k['Value'] for k in  response['Parameters']}
   return param_values

param_values=get_credentials(params)

 

Congratulations! You have now successfully configured Sagemaker and EMR.  You’re now ready for reading the dataset from Snowflake. This time, however, there’s no need to limit the number or results and, as you will see, you’ve now ingested 225 million rows.

sfOptions = {
  "sfURL" : param_values['/SNOWFLAKE/URL'],
  "sfAccount" : param_values['/SNOWFLAKE/ACCOUNT_ID'],
  "sfUser" : param_values['/SNOWFLAKE/USER_ID'],
  "sfPassword" : param_values['/SNOWFLAKE/PASSWORD'],
  "sfDatabase" : param_values['/SNOWFLAKE/DATABASE'],
  "sfSchema" : param_values['/SNOWFLAKE/SCHEMA'],
  "sfWarehouse" : param_values['/SNOWFLAKE/WAREHOUSE'],
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", \
"select (V:main.temp_max - 273.15) * 1.8000 + 32.00 as temp_max_far, " +\
"       (V:main.temp_min - 273.15) * 1.8000 + 32.00 as temp_min_far, " +\
"       cast(V:time as timestamp) time, " +\
"       V:city.coord.lat lat, " +\
"       V:city.coord.lon lon " +\
"from snowflake_sample_data.weather.weather_14_total").load()
df.describe().show()

Conclusion

Cloud-based SaaS solutions have greatly simplified the build-out and setup of end-to-end machine learning (ML) solutions and have made ML available to even the smallest companies. What once took a significant amount of time, money and effort can now be accomplished with a fraction of the resources.

For more information on working with Spark, please review the excellent two-part post from Torsten Grabs and Edward Ma. The first part, Why Spark, explains benefits of using Spark and how to use the Spark shell against an EMR cluster to process data in Snowflake. The second part, Pushing Spark Query Processing to Snowflake, provides an excellent explanation of how Spark with query pushdown provides a significant performance boost over regular Spark processing.

A Sagemaker / Snowflake setup makes ML available to even the smallest budget. That leaves only one question. What will you do with your data?

You can review the entire blog series here: Part One > Part Two > Part Three > Part Four.  

 

Subscribe to the Snowflake Blog

Boost Your Analytics with Machine Learning and Advanced Data Preparation

Enterprises can now harness the power of Apache Spark to quickly and easily prepare data and build Machine Learning (ML) models directly against that data in Snowflake. Snowflake and Qubole make it easy to get started by embedding required drivers, securing credentials, simplifying connection setup and optimizing in-database processing. Customers can focus on getting started quickly with their data preparation and ML initiatives instead of worrying about complex integrations and the cost of moving large data sets.

Setting up the Snowflake connection and getting started takes only a few minutes. Customers first create a Snowflake data store in Qubole and enter details for their Snowflake data warehouse. All drivers and packages are preloaded and kept up to date, eliminating manual bootstrapping of jars into the Spark cluster. There is no further configuration or tuning required and there are no added costs for the integration. Once the connection is saved, customers can browse their snowflake tables, view metadata and see a preview of the Snowflake data all from the Qubole interface. They can then use Zeppelin notebooks to get started reading and writing data to Snowflake as they begin exploring data preparation and ML use cases.

Below is an example of the object browser view showing the available tables and properties:

Security is also handled seamlessly so customers can focus on getting started with their data, without the worry of over-protecting their credentials. Qubole provides centralized and secure credential management which eliminates the need to specify any credentials in plain text. Username and password are entered only when setting up the data store, but are otherwise inaccessible.

The solution is also designed for enterprise requirements and allows customers to use federated authentication and SSO via the embedded Snowflake drivers. With SSO enabled, customers can authenticate through an external, SAML 2.0-compliant identity provider (IdP) and achieve a higher level of security and simplicity. These capabilities help customers more easily share notebooks and collaborate on projects with little risk of sensitive information being exposed.

Below is a sample Scala program showing how to read from Snowflake using the data store object without specifying any credentials in plain text:

Beyond the simplicity and security of the integration, which helps customers get started quickly, customers will also benefit from a highly optimized Spark integration that uses Snowflake query pushdown to balance the query-processing power of Snowflake with the computational capabilities of Apache Spark. From simple projection and filter operations to advanced operations such as joins, aggregations and even scalar SQL functions, query pushdown runs these operations in Snowflake (where the data resides) to help refine and pre-filter the data before it is read into Spark. The traditional performance and cost challenges associated with moving large amounts data for processing are thereby eliminated without additional overhead or management.

With Snowflake and Qubole, customers get an optimized platform for advanced data preparation and ML that makes it simple to get started. Customers can complement their existing cloud data warehouse strategy or get more value out of ML initiatives by opening access to more data. To learn more about ML and advanced data preparation with Qubole, visit the Qubole blog.

Try Snowflake for free. Sign up and receive $400 US dollars worth of free usage. You can create a sandbox or launch a production implementation from the same Snowflake environment.

Quickstart Guide for Sagemaker + Snowflake (Part One)

Machine Learning (ML) and predictive analytics are quickly becoming irreplaceable tools for small startups and large enterprises. The questions that ML can answer are boundless. For example, you might want to ask, “What job would appeal to someone based on their  interests or the interests of jobseekers like them?” Or, “Is this attempt to access the network an indication of an intruder?” Or, “What type of credit card usage indicates fraud?”

Setting up a machine learning environment, in particular for on-premise infrastructure, has its challenges. An infrastructure team must request physical and/or virtual machines and then build and integrate those resources. This approach is both time-consuming and error prone due to the number of manual steps involved. It may work in a small environment, but the task becomes exponentially more complicated and impractical at scale.

There are many different ML systems to choose from, including  TensorFlow, XGBoost, Spark ML and MXNet, to name a few. They all come with their own installation guides, system requirements and dependencies. What’s more, implementation is just the first step. The next challenge is figuring out how to make the output from the machine learning step (e.g. a model) available for consumption. Then, all of the components for building a model within the machine learning tier and the access to the model in the API tier need to scale to provide predictions in real-time. Last but not least, the team needs to figure out where to store all the data needed to build the model.

Managing this whole process from end-to-end becomes significantly easier when using cloud-based technologies. The ability to provision infrastructure on demand (IaaS) solves the problem of manually requesting virtual machines. It also provides immediate access to compute resources whenever they are needed. But that still leaves the administrative overhead of managing the ML software and the platform to store and manage the data.   

At last year’s AWS developer conference, AWS announced Sagemaker, a “Fully managed end-to-end machine learning service that enables data scientists, developers and machine learning experts to quickly build, train and host machine learning models at scale.”

Sagemaker can access data from many different sources (specifically the underlying kernels like Python, PySpark, Spark and R), and access data provided by Snowflake.  Storing data in Snowflake also has significant advantages.

Single source of truth

If data is stored in multiple locations, inevitably those locations will get out of sync. Even if data is supposed to be immutable, often one location is modified to fix a problem for one system while other locations are not. In contrast, if data is stored in one central, enterprise-grade, scalable repository, it serves as a “single source of truth” because keeping data in sync is made easy. Different tools are not required for structured and semi-structured data. Data can be modified transactionally, which immediately reduces the risk of problems.

Shorten the data preparation cycle

According to a study published in Forbes, data preparation accounts for about 80% of the work performed by data scientists. Shortening the data preparation cycle therefore has a major impact on the overall efficiency of data scientists.

Snowflake is uniquely positioned to shorten the data preparation cycle due to its excellent support for both structured and semi-structured data into language, SQL. This means that semi-structured data and structured data can be seamlessly parsed, combined, joined and modified through SQL statements in set-based operations. This enables data scientists to use the power of a full SQL engine for rapid data cleansing and preparation.

Scale as you go

Another problem that ML implementers frequently encounter is what we at Snowflake call “works on my machine” syndrome. Small datasets easily work on a local machine but when migrated to production dataset size, reading all the data into a single machine doesn’t scale, or it may behave unexpectedly. Even if it does finish the job, it can take hours to load a terabyte-sized dataset. In Snowflake there is no infrastructure that needs to be provisioned, and Snowflake’s elasticity feature allows you to scale horizontally as well as vertically, all with the push of a button.

Connecting Sagemaker and Snowflake

Sagemaker and Snowflake both utilize cloud infrastructure as a service offerings by AWS, which enables us to build the Infrastructure when we need it, where we need it (geographically) and at any scale required.

Since building the services becomes simplified with Sagemaker and Snowflake, the question becomes how to connect the two services. And that’s exactly the subject of the following parts of this post. How do you get started? What additional configuration do you need in AWS for security and networking? How do you store credentials?

In part two of this four-part blog, I’ll explain how to build a Sagemaker ML environment in AWS from scratch. In the third post, I will put it all together and show you how to connect a Jupyter Notebook to Snowflake via the Snowflake Python connector.  With the Python connector, you can import data from Snowflake into a Jupyter Notebook. Once connected, you can begin to explore data, run statistical analysis, visualize the data and call the Sagemaker ML interfaces.

However, to perform any analysis at scale, you really don’t want to use a single server setup like Jupyter running a python kernel. Jupyter running a PySpark kernel against a Spark cluster on EMR is a much better solution for that use case. So, in part four of this series I’ll connect a Jupyter Notebook to a local Spark instance and an EMR cluster using the Snowflake Spark connector.

The Snowflake difference

Snowflake is the only data warehouse built for the cloud. Snowflake delivers the performance, concurrency and simplicity needed to store and analyze all data available to an organization in one location. Snowflake’s technology combines the power of data warehousing, the flexibility of big data platforms, the elasticity of the cloud, and live data sharing at a fraction of the cost of traditional solutions. Snowflake: Your data, no limits.

You can review the entire blog series here: Part One > Part Two > Part Three > Part Four.  

New Snowflake features released in Q1’17

We recently celebrated an important milestone in reaching 500+ customers since Snowflake became generally available in June 2015. As companies of all sizes increasingly adopt Snowflake, we wanted to look back and provide an overview of the major new Snowflake features we released during Q1 of this year, and highlight the value these features provide for our customers.

Expanding global reach and simplifying on-boarding experience

Giving our customers freedom of choice, along with a simple, secure, and guided “Getting Started” experience, was a major focus of the last quarter.

  • We added a new region outside of the US; customers now have the option to analyze and store their data in Snowflake accounts deployed in EU-Frankfurt. Choosing the appropriate region is integrated into our self-service portal when new customers sign up.
  • In addition, we added our high-value product editions, Enterprise and Enterprise for Sensitive Data (ESD), to our self-service offerings across all available regions. For example, with Enterprise, customers can quickly implement auto-scale mode for multi-cluster warehouses to support varying, high concurrency workloads. And customers requiring HIPAA compliance can choose ESD.
  • Exploring other venues for enabling enterprises to get started quickly with Snowflake, we partnered with the AWS Marketplace team to include our on-demand Snowflake offerings, including the EU-Frankfurt option, in their newly-launched SaaS subscriptions.

Improving out-of-the-box performance & SQL coverage

We are committed to building the fastest cloud DW for your concurrent workloads with the SQL you love.

  • One key performance improvement introduced this quarter was the reduction of compilation times for JSON data. Internal TPC-DS tests demonstrate a reduction between 30-60% for most of the TPC-DS queries (single stream on a single, 100TB JSON table). In parallel, we worked on improving query compile time in general, providing up to a 50% improvement in performance for short queries.
  • Another new key capability is the support for bulk data inserts on a table concurrently with other DML operations (e.g. DELETE, UPDATE, MERGE). By introducing more fine-grained locking at the micro-partition level, we are able to allow concurrent DML statements on the same table.
  • To improve our data clustering feature (currently in preview), we added support for specifying expressions on table columns in clustering keys. This enables more fine-grained control over the data in the columns used for clustering.
  • Also, we reduced the startup time for virtual warehouses (up to XL in size) to a few seconds, ensuring almost instantaneous provisioning for most virtual warehouses.
  • We extended our SQL by adding support for the ANSI SQL TABLESAMPLE clause. This is useful when a user wants to limit a query operation performed on a table to only a random subset of rows from the table.

Staying Ahead with Enterprise-ready Security

From day one, security has always been core to Snowflake’s design.

  • We expanded Snowflake’s federated authentication and single sign-on capability by integrating with many of the most popular SAML 2.0-compliant identity providers. Now, in addition to Okta, Snowflake now supports ADFS/AD, Azure AD, Centrify, and OneLogin, to name just a few.
  • To advance Snowflake’s built-in auditing, we introduced new Information Schema table functions (LOGIN_HISTORY and LOGIN_HISTORY_BY_USER) that users can query to retrieve the short-term history of all successful and failed login requests in the previous 7 days. If required, users can maintain a long-term history by copying the output from these functions into regular SQL tables.

Improving our ecosystem

Enabling developers and builders to create applications with their favorite tools and languages remains a high priority for us.

  • With respect to enterprise-class ETL, we successfully collaborated with Talend in building a native Snowflake connector based on Talend’s new and modern connector SDK. The connector, currently in preview, has already been deployed by a number of joint customers with great initial feedback on performance and ease-of-use.
  • To tighten the integration of our Snowflake service with platforms suited for machine learning and advanced data transformations, we released a new version of our Snowflake Connector for Spark, drastically improving performance by pushing more query operations, including JOINs and various aggregation functions, down to Snowflake. Our internal 10 TB TPC-DS performance benchmark tests demonstrate that running TPC-DS queries using this new v2 Spark connector is up to 70% faster compared to executing SQL in Spark with Parquet or CSV (see this Blog post for details).
  • We continue to improve our drivers for our developer community. Listening to feedback from our large Python developer community, we worked on a new version of Snowflake’s native Python client driver, resulting in up to 40% performance improvements when fetching result sets from Snowflake. And, after we open-sourced our JDBC driver last quarter, we have now made the entire source code available on our official GitHub repository.
  • And, last, but not least, to enhance our parallel data loading via the COPY command, ETL developers can now dynamically add file metadata information, such as the actual file name and row number, which might not be part of the initial payload.

Increasing transparency and usability

These features are designed to strike the right balance between offering a service that is easy to operate and exposing actionable insights into the running service.

  • One major addition to our service is Query Profile, now general available and fully integrated into Snowflake’s web interface. Query Profile is a graphical tool you can use to detect performance bottlenecks and areas for improving query performance.
  • Various UI enhancements were implemented: Snowflake’s History page now supports additional filtering by the actual SQL text and query identifier. We also added UI support for creating a Parquet file format in preparation for loading Parquet data into variant-type table columns in Snowflake.
  • A new Information Schema table function (TABLE_STORAGE_METRICS) exposes information about the data storage for individual tables. In particular, a user can now better understand how tables are impacted by Continuous Data Protection, particularly Time Travel and Fail-safe retention periods, as well as which tables contain cloned data.
  • We also recently introduced smarter virtual warehouse billing through Warehouse Billing Continuation (see this Blog post for details). If a warehouse is suspended and resumed within 60 minutes of the last charge, we do not charge again for the servers in the warehouse. WBC eliminates additional credit charges, and we hope it will reduce the need for our customers to strictly monitor and control when warehouses are suspended and resized.

Scaling and investing in service robustness

These service enhancements aren’t customer visible, but are crucial for scaling to meet the demands of our rapidly growing base of customers.

  • As part of rolling out the new EU (Frankfurt) region, we increased automation of our internal deployment procedures to (a) further improve engineering efficiency while (b) laying the foundation for rapidly adding new regions based on customer feedback.
  • We further streamlined and strengthened our various internal testing and pre-release activities, allowing us to ship new features to our customers on a weekly basis – all in a fully transparent fashion with no downtown or impact to users.

Conclusion and Acknowledgements

This summary list of features delivered in Q1 highlights the high velocity and broad range of features the Snowflake Engineering Team has successfully delivered in a short period of time. We are committed to putting our customers first and maintaining this steady pace of shipping enterprise-ready features each quarter. Stay tuned for another feature-rich Q2.

For more information, please feel free to reach out to us at info@snowflake.net. We would love to help you on your journey to the cloud. And keep an eye on this blog or follow us on Twitter (@snowflakedb) to keep up with all the news and happenings here at Snowflake Computing.

Snowflake and Spark, Part 2: Pushing Spark Query Processing to Snowflake

Welcome to the second post in our 2-part series describing Snowflake’s integration with Spark. In Part 1, we discussed the value of using Spark and Snowflake together to power an integrated data processing platform, with a particular focus on ETL scenarios.

In this post, we change perspective and focus on performing some of the more resource-intensive processing in Snowflake instead of Spark, which results in significant performance improvements. As part of this, we walk you through the details of Snowflake’s ability to push query processing down from Spark into Snowflake. We also touch on how this pushdown can help you transition from a traditional ETL process to a more flexible and powerful ELT model.

Query pushdown is supported with v2.1 (and later) of the Snowflake Connector for Spark. As you explore the capabilities of the connector, make sure you are using the latest version, available from Spark Packages or Maven Central (source code in Github).

Overview of Querying in Spark with Snowflake

Before we get into the specifics of query pushdown, let’s review the basic query flow between Spark and Snowflake. Processing queries with the Snowflake Connector for Spark involves the same steps as data loading (as discussed in Part 1 of this Blog series), but in a slightly different order:

  1. The Spark driver sends the SQL query to Snowflake using a Snowflake JDBC connection.
  2. Snowflake uses a virtual warehouse to process the query and copies the query result into AWS S3.
  3. The connector retrieves the data from S3 and populates it into DataFrames in Spark.
Query flow from Spark to Snowflake
Figure 1: Query flow from Spark to Snowflake

As with data loading (discussed in Part 1), note how the Snowflake worker nodes (i.e. servers in the virtual warehouse) perform all the heavy processing for the data egress, and the slave nodes in the Spark cluster perform the data ingress. This allows you to size your Snowflake virtual warehouse and Spark clusters to balance compute capacity and IO bandwidth against S3 for optimal performance. Assuming unbounded ingress and egress capacity on S3, this approach gives you virtually unlimited capacity for transferring data back and forth between Spark and Snowflake by simply scaling both clusters to the levels that your workload requires.

Highly Optimized Performance through Query Pushdown

In earlier versions of the Spark connector, Spark’s PrunedFilteredScan interface allowed simple projection and filter operations (e.g. .select(.) and .filter(.) in Scala) to be translated and pushed to Snowflake, instead of being processed in Spark. These Snowflake optimizations were helpful in many situations; however, other operations, such as joins, aggregations, and even scalar SQL functions, could only be performed in Spark. This approach is typically not ideal for more capable Spark data sources, such as Snowflake, which can perform these functions more efficiently.

Starting with v2.1, the connector introduces advanced optimization capabilities for better performance. At the heart of the performance optimizations is the ability to push down queries to Snowflake. With Snowflake as the data source for Spark, v2.1 of the connector can push large and complex Spark logical plans (in their entirety or in parts) to be processed in Snowflake, thus enabling Snowflake to do more of the work and leverage its performance efficiencies. This capability establishes a tight integration between the two systems and combines the powerful query-processing of Snowflake with the computational capabilities of Apache Spark and its ecosystem.

Enabling Query Pushdown

Enable the query pushdown feature for the connector using the following static method call:

SnowflakeConnectorUtils.enablePushdownSession(spark)

Why Pushdown?

Users of both Snowflake and Spark may find that a large amount of the data they would like to use resides in Snowflake. A federated setup exists when two or more interconnected systems can process all or parts of a particular data task flow, leading to the common question of where different parts of the computation should occur. A common concern with federated setups is performance for processing large data sets. For the best performance, you typically want to avoid reading lots of data or transferring large intermediate results between the interconnected systems. Ideally, most of the processing happens close to where the data is stored to leverage any capabilities of the participating stores to dynamically eliminate data that is not needed.

Spark already supports a good set of functionality for relational data processing, as well as connectivity with a variety of data sources, including the columnar Parquet format. Snowflake, however, can achieve much better query performance via efficient pruning of data enabled through our micro-partition metadata tracking and clustering optimizations (see the Snowflake documentation for more details). This metadata allows Snowflake to scan data more efficiently when given query predicates by using aggregate information on micro-partitions, such as min and max values, since data that is determined not to contain relevant values can be skipped entirely. Additionally, metadata such as cardinality of column values (number of distinct values), allows Snowflake to better optimize for operations such as join ordering.

Given that filter, projection, join, and aggregation operations on data all have the potential to significantly reduce the result set of a given query, the data pruning used by Snowflake can and should be leveraged. This also has the benefit of reducing data that has to be transferred to Spark via S3 and the network, which in turn improves response times.

To support pushing more work to Snowflake, the Snowflake connector integrates deeply with the query plan generation process in Spark.

Query Plan Generation

To understand how query pushdown works, let’s take a look at the typical process flow of a Spark DataFrame query. Spark contains its own optimizer, Catalyst, that performs a set of source-agnostic optimizations on the logical plan of a DataFrame (predicate pushdowns, constant folding, etc.). DataFrames are executed lazily. This means Spark can evaluate and optimize relational operators applied to a DataFrame and only execute the DataFrame when an action is invoked. Consider the following expansion on our zip code example from Part 1 of the series:

val dfZipCodes = spark.read.format(...).option(...,...).load()
val dfFilteredZips = dfZipCodes.filter("zip_code < 98000")
val dfCities = dfFilteredZips.select(city)

The same example can also be expressed as:

dfZipCodes.createOrReplaceTempView("temp_zip_codes")
val dfSQLCities = spark.sql("SELECT city from temp_zip_codes WHERE zip_code < 98000")

In either case, Spark delays planning and executing the code until an action such as collect()show(), or count() occurs.

When an action is required, Spark’s optimizer, Catalyst, first produces an optimized logical plan. The process then moves to the physical planning stage. This is where Spark determines whether to push down a query to Snowflake, as shown in the following diagram:

Location of Snowflake alternative physical plan in Catalyst query plan
Figure 2: Location of Snowflake alternative physical plan in a Catalyst query plan
(based on an image originally published in this DataBricks blog post)

Structure of a Snowflake Plan

So, how does the connector allow query pushdown to happen? With query pushdown enabled, Catalyst inserts a Snowflake plan as one possible physical plan for Spark to choose based on cost, as illustrated in the diagram above.

Input: After passing through Catalyst, a DataFrame is represented as a logical plan tree, with nodes representing data sources and operators. For example, consider the following code:

val dfZips = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","zip_codes").load()
val dfMayors = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","city_mayors").load()
val dfResult = dfZips.filter("zip_code > 98000").join(dfMayors.select($"first",$"last",$"city",$"city_id"), dfZip("city_id") === dfMayors("city_id"), "inner")

DataFrame dfResult may be internally represented by Spark in a data structure similar to the following:

Data structure representation of join on two tables with filtering and projection
Figure 3: Data structure representation of join on two tables with filtering and projection

The tree represents a join of two Snowflake tables, after applying a filter on the left-side relation (zip_codes table) and a projection on the right-side relation (city_mayors table) .

Translation: The connector traverses the above data structure and procedurally generates a Snowflake plan to execute it. In previous iterations of our connector, Spark performed the join on zip_codes and city_mayors. With the new feature enabled, however, the connector is able to verify that zip_codes and city_mayors are joinable relations within Snowflake and thus recognize that the join can be performed completely in Snowflake.

This same process can also be applied to SORT, GROUP BY, and LIMIT operations, and more.

Performance Results

Pushing queries down to Snowflake can greatly improve end-to-end performance. To illustrate this, we ran a suite of TPC-DS queries that mirror three different workloads in Cloudera’s Impala benchmarks:

  • Workload A (Interactive Queries)
  • Workload B (Reporting)
  • Workload C (Analytic Queries)

We compared the end-to-end performance between Snowflake and Spark using 10TB scale, with queries executed on a 3X-Large virtual warehouse (for Snowflake) and an equivalent 64-node C3.2XLarge EC2 cluster (for Spark). For each workload, we tested 3 different modes:

  • Spark-Snowflake Integration with Full Query Pushdown: Spark using the Snowflake connector with the new pushdown feature enabled.
  • Spark on S3 with Parquet Source (Snappy): Spark reading from S3 directly with data files formatted as Parquet and compressed with Snappy.
  • Spark on S3 with CSV Source (gzip): Spark reading from S3 directly with data files formatted as CSV and compressed with gzip.

The following 3 charts show the performance comparison (in seconds) for the TPC-DS queries in each workload. Note that the numbers for Spark-Snowflake with Pushdown represent the full round-trip times for Spark to Snowflake and back to Spark (via S3), as described in Figure 1:

  1. Spark planning + query translation.
  2. Snowflake query processing + unload to S3.
  3. Spark read from S3.

The scale for the charts is logarithmic to make reading easier.

Performance comparison between queries in Workload A with pushdown vs no pushdown
Figure 4: Performance comparison between queries in Workload A with pushdown vs no pushdown
Performance comparison between queries in Workload B with pushdown vs no pushdown
Figure 5: Performance comparison between queries in Workload B with pushdown vs no pushdown
Performance comparison between queries in Workload C with pushdown vs no pushdown
Figure 6: Performance comparison between queries in Workload C with pushdown vs no pushdown

As demonstrated, fully pushing query processing to Snowflake provides the most consistent and overall best performance, with Snowflake on average doing better than even native Spark-with-Parquet.

Note that the columnar format of Parquet is sometimes leveraged by Spark for efficient pruning of unneeded data, but Snowflake answers many of the more complex queries significantly faster than Spark-with-Parquet, e.g. queries 59 and 79 in Workload C (Analytics).

ETL vs ELT

With traditional ETL, most data transformation (filtering, sorting, etc.) typically takes place before loading to limit the data size and ensure optimal querying performance. Snowflake, with its low storage costs and powerful SQL capabilities, combined with the significant performance improvements provided by query pushdown, enables transitioning to a more modern and effective ELT model, in which you load all your data into Snowflake and then perform any data transformations directly in Snowflake. And all of this is accomplished without any changes to your familiar Spark programming experience using Scala, Python, or SparkSQL.

Summary and Next Steps

This second post in our series about Spark and Snowflake showed how you can use the Snowflake Connector for Spark to realize significant performance improvements by pushing data processing from Spark into Snowflake. This makes Snowflake the data repository of choice for your ELT scenarios, even if you have existing code in Spark for your data ingress pipeline.

So what’s next? We are continuously looking for ways to improve the experience of working with both Spark and Snowflake. Currently, we are exploring removing the requirement for user-managed S3 buckets for data transfer between Spark and Snowflake, and using Snowflake internal stages instead. Keep an eye on this blog to learn more about our progress on this front.

In the meantime, we encourage you to try Snowflake integrated with Spark in your data processing solutions today:

Also, are you interested in helping design and build the next-generation Spark-Snowflake integration? If so, we invite you to take a look at the open Engineering positions on our careers page.

And, as always, you can follow us on Twitter (@snowflakedb) to keep up with all the latest news and happenings here at Snowflake Computing.

Snowflake and Spark, Part 1: Why Spark?

This is the first post in a 2-part series describing Snowflake’s integration with Spark. In this post, we introduce the Snowflake Connector for Spark (package available from Maven Central or Spark Packages, source code in Github) and make the case for using it to bring Spark and Snowflake together to power your data-driven solutions.

What is Spark?

Apache Spark is a distributed data processing system with support for functional, declarative and imperative programming styles. Its popularity and power lie in its myriad of programming paradigms, supported APIs (Scala, R, and Python), machine-learning libraries, and tight integration with the Hadoop ecosystem. Spark also provides connectivity to a wide variety of data sources, including SQL-based relational database systems and NoSQL systems. As a result, Spark has become the tool of choice for data engineering tasks.

With the introduction of the Snowflake Connector for Spark in June 2016, Snowflake enabled connectivity to and from Spark. The connector provides the Spark ecosystem with access to Snowflake as a fully-managed and governed repository for all data types, including JSON, Avro, CSV, XML, machine-born data, etc. The connector also enables powerful use cases that integrate Spark and Snowflake, including:

  • Complex ETL: Using Spark, you can easily build complex, functionally rich and highly scalable data ingestion pipelines for Snowflake. With a large set of readily-available connectors to diverse data sources, Spark facilitates data extraction, which is typically the first part in any complex ETL pipeline. Spark also helps with computationally-involved tasks for data transformation such as sessionization, data cleansing, data consolidation, and data unification, which usually happens at later stages in the ETL pipeline. Using the Snowflake Connector for Spark, the data produced by these complex ETL pipelines can now easily be stored in Snowflake for broad, self-service access across the organization using standard SQL and SQL tools.
  • Machine Learning: Spark provides a rich ecosystem for machine learning and predictive analytics functionality, e.g. the popular machine learning library, MLlib. With the integration between Spark and Snowflake, Snowflake provides you with an elastic, scalable repository for all the data underlying your algorithm training and testing. With machine learning, processing capacity needs can fluctuate heavily. Snowflake can easily expand its compute capacity to allow your machine learning in Spark to process vast amounts of data.

Enabling Spark in AWS EMR with Snowflake

With the Snowflake Connector for Spark, you can use Spark clusters, e.g. in AWS EMR or Data Bricks, and connect them easily with Snowflake. For example, you can create an EMR cluster with Spark pre-installed when selecting Spark as the application. Before we dive into the details of using Snowflake with Spark, the following code samples illustrate how to create and connect to a Spark cluster in AWS EMR and start a spark-shell using the connector:

STEP 1: Create a Spark cluster in AWS EMR 5.4.0 with Spark 2.1 using the AWS CLI. For example, in US-West-2:

aws emr create-cluster \
   --applications Name=Ganglia Name=Spark \
   --ec2-attributes '{"KeyName":"","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":""}' \
   --service-role EMR_DefaultRole \
   --enable-debugging \
   --release-label emr-5.4.0 \
   --log-uri 's3n:///elasticmapreduce/' \
   --name '' \
   --instance-groups \
      '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master Instance Group"}, \
      {"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core Instance Group"}]' \
   --configurations '[{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"},"Configurations":[]}]' \
   --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
   --region us-west-2

STEP 2: Connect to the cluster using ssh:

ssh i ~/.pem hadoop@.compute.amazonaws.com 

STEP 3: Start spark-shell with the Snowflake connector packages. Alternatively, you can also pre-load the packages using the packages option when creating the cluster. For example:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3

Where:

  • spark-snowflake_2.11 specifies the connector artifact ID (for Scala 2.11).
  • 2.1.3 specifies the connector version. Note that this version is for Spark 2.1. For Spark 2.0, use 2.1.3-spark_2.0 instead.

Also, note that, if you are not running from an EMR cluster, you need to add the package for AWS support to the packages list. For instance, when you run spark-shell from a local installation, your packages list will look like this:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3,org.apache.hadoop:hadoop-aws:2.8.0

STEP 4: In spark-shell, you then need to define which Snowflake database and virtual warehouse to use. The connector also needs access to a staging area in AWS S3 which needs to be defined. You can do that with the following Scala commands in spark-shell:

// Configuration of the staging area for the connector in AWS S3 
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "")

// Snowflake instance parameters
val defaultOptions = Map(
  "sfURL" -> "",
  "sfAccount" > "",
  "sfUser" -> "",
  "sfPassword" -> "",
  "sfDatabase" -> "",
  "sfSchema" -> "public",
  "sfWarehouse" -> "",
  "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
  "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
  "tempdir" -> "s3n:///"
)

With these steps, you now have a Spark cluster and a spark-shell running in AWS EMR that you can use to showcase some of the most common use cases, such as complex ETL and machine learning.

Overview of Loading Data from Spark into Snowflake

Before we get into our discussion of ETL using Spark and Snowflake, let’s first take a look at what happens behind the scenes when the Snowflake Spark connector persists Spark data frames in Snowflake.

For data loading operations, the Spark connector for Snowflake performs three separate steps, similar to the spark-redshift connector:

  1. The first step persists the contents of the data frame in the staging area in AWS S3. This step currently uses the S3 bucket provided during the configuration of the Spark connector (as described in the Snowflake documentation). Future versions of the connector will not require an S3 bucket.
  2. The second step then connects to Snowflake from Spark using JDBC. On that JDBC connection, it issues a COPY command to load the data.
  3. As the third and final step, the COPY command retrieves the data from the staging area in S3 and using the current virtual warehouse to load it into tables in the Snowflake database.

Note that, in the picture above, the slave nodes in the Spark cluster and the compute nodes in Snowflake (i.e. the virtual warehouse) process the data flow. This approach allows for much greater scale than a more conventional approach where the data flow goes through the JDBC connection. Here you can scale the Spark cluster or the Snowflake virtual warehouse independently to increase data transfer bandwidth between Spark and Snowflake while your bandwidth will always be limited to a single JDBC connection.

Complex ETL in Spark with Snowflake

Today, data engineers need to continuously extract and transform data from various sources before it can be made available for consumption by business users through querying and analytics. Given the volume of data that today’s organizations face, a significant amount of compute and storage capacity needs to be made available to perform these tasks in a timely and scalable fashion. Spark has become the tool of choice for many data engineers to implement the computation steps along their data processing pipelines. This is due to the high efficiency of Spark with its in-memory execution capabilities, the availability of libraries from the Spark ecosystem, and the ease of development with languages such as Scala and Python.

The following example illustrates how Spark can be used to implement a simple data ingestion pipeline that performs several transformations on the new data before storing it in Snowflake. The example uses a web log scenario. Assume that new data is read from a web server log file, in this case using the Apache web log format. Log lines are made available as a list in Scala. The sample parses the IP addresses from the log lines and transforms them into ZIP codes using REST calls to the FreeGeoIP web service. The resulting list of ZIP codes is then stored in Snowflake.

// *************************************
// Geo mapping of Apache web log lines 
// *************************************
import scala.util.parsing.json.JSON._
import scala.io.Source.{fromInputStream}
import java.net._

// Example data: Apache web server log lines
var weblog: List[String] = List(
 """8.8.8.8 - - [07/Mar/2017:00:05:49 -0800] "GET /foo/index.html HTTP/1.1" 401 12846""",
 """8.8.8.8 - - [07/Mar/2017:00:06:51 -0800] "GET /bar/index.html HTTP/1.1" 200 4523""")
// Function to parse web log line into a list of zip codes
def ParseLogLine (strLine : String) : Integer = {
 var Array(ip, d1, d2, dt, tz, request, url, proto, ret, size) = strLine.split(" ")

 // REST call to URL
 var url2 = "http://freegeoip.net/json/".concat(s"$ip")
 var result2 = scala.io.Source.fromURL(url2).mkString

 // Parse into JSON
 var json: Option[Any] = parseFull(result2)
 var map: Map[String,Any] = json.get.asInstanceOf[Map[String, Any]]
 var zip: String = map("zip_code").asInstanceOf[String]

var intZip: Integer = zip.trim.mkString.toInt 

return intZip
}
// Parse web log
var zips: List[Integer] = weblog.map { line => ParseLogLine(line)
}
// Let's take a look at the zip codes we found in the log
zips.foreach(println(_))

// Get a data frame for the zip codes list
var df = zips.toDF()

// Push the list/df content to Snowflake as a new table
import org.apache.spark.sql.SaveMode
df.write.format("net.snowflake.spark.snowflake").options(defaultOptions).option("dbtable", "zip_codes").mode(SaveMode.Overwrite).save()

Now we have the zip codes in Snowflake and can start using them in Snowflake queries and BI tools that connect to Snowflake.

Machine Learning in Spark with Snowflake Connectivity

Part of Spark’s appeal is how easy it is to use machine learning capabilities over the data that has been made available to Spark. For example, MLlib, a popular library for machine learning, comes as part of the standard Spark configuration. With these machine learning capabilities in hand, organizations can easily gain new insights and business value from the data that they acquire.

Expanding on our previous web log example, you may wonder what zip codes or broader geographical areas the requests in the web server logs are coming from. The following Scala code illustrates how to retrieve a query in Snowflake and apply machine learning functions to the query:

// *************************************
// Retrieve zip codes stored in Snowflake into Spark
// *************************************

// Function to retrieve a Snowflake query result into a Spark data frame
def snowflakedf(sql: String) = {
 spark.read
 .format("net.snowflake.spark.snowflake")
 .options(defaultOptions)
 .option("query", sql)
 .load()
}

// Snowflake SQL query to retrieve all ZIP codes
val df2 = snowflakedf("SELECT * FROM zip_codes")

// *************************************
// Machine learning over zip codes stored in Snowflake 
// *************************************
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Convert DF with SQL result into Vectors for the MLlib API
var vectors = df2.rdd.map(r => Vectors.dense(r.getDecimal(0).doubleValue()))
vectors.cache

var numClusters = 2
var numIterations = 20
var clusters = KMeans.train(vectors, numClusters, numIterations)
clusters.clusterCenters

You can now use snowflakedf(.) to define Spark data frames that are populated with data from the Snowflake query.

Summary and Next Steps

As we’ve illustrated in this post, Spark is a powerful tool for data wrangling. Its rich ecosystem provides compelling capabilities for complex ETL and machine learning. With the deep integration into Spark provided by the connector, Snowflake can now serve as the fully-managed and governed database for all your Spark data, including traditional relational data, JSON, Avro, CSV, XML, machine-born data, etc. This makes Snowflake your repository of choice in any Spark-powered solution.

So what’s next? We encourage you to try Snowflake and its integration with Spark in your data processing solutions today:

Also, are you interested in helping design and build the next-generation Spark-Snowflake integration? If so, we invite you to take a look at the open Engineering positions on our careers page.

In part 2 of this series, we’ll take a look behind the scenes to better understand how the Spark connector processes queries that retrieve data from Snowflake into Spark and how this can be used to enable high-performance ELT solutions using Spark and Snowflake. In the meantime, keep an eye on this blog or follow us on Twitter (@snowflakedb) to keep up with all the news and happenings here at Snowflake Computing.